Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMergeAppend.c
4 : * routines to handle MergeAppend nodes.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMergeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitMergeAppend - initialize the MergeAppend node
17 : * ExecMergeAppend - retrieve the next tuple from the node
18 : * ExecEndMergeAppend - shut down the MergeAppend node
19 : * ExecReScanMergeAppend - rescan the MergeAppend node
20 : *
21 : * NOTES
22 : * A MergeAppend node contains a list of one or more subplans.
23 : * These are each expected to deliver tuples that are sorted according
24 : * to a common sort key. The MergeAppend node merges these streams
25 : * to produce output sorted the same way.
26 : *
27 : * MergeAppend nodes don't make use of their left and right
28 : * subtrees, rather they maintain a list of subplans so
29 : * a typical MergeAppend node looks like this in the plan tree:
30 : *
31 : * ...
32 : * /
33 : * MergeAppend---+------+------+--- nil
34 : * / \ | | |
35 : * nil nil ... ... ...
36 : * subplans
37 : */
38 :
39 : #include "postgres.h"
40 :
41 : #include "executor/executor.h"
42 : #include "executor/execPartition.h"
43 : #include "executor/nodeMergeAppend.h"
44 : #include "lib/binaryheap.h"
45 : #include "miscadmin.h"
46 :
47 : /*
48 : * We have one slot for each item in the heap array. We use SlotNumber
49 : * to store slot indexes. This doesn't actually provide any formal
50 : * type-safety, but it makes the code more self-documenting.
51 : */
52 : typedef int32 SlotNumber;
53 :
54 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55 : static int heap_compare_slots(Datum a, Datum b, void *arg);
56 :
57 :
58 : /* ----------------------------------------------------------------
59 : * ExecInitMergeAppend
60 : *
61 : * Begin all of the subscans of the MergeAppend node.
62 : * ----------------------------------------------------------------
63 : */
64 : MergeAppendState *
65 550 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 : {
67 550 : MergeAppendState *mergestate = makeNode(MergeAppendState);
68 : PlanState **mergeplanstates;
69 : const TupleTableSlotOps *mergeops;
70 : Bitmapset *validsubplans;
71 : int nplans;
72 : int i,
73 : j;
74 :
75 : /* check for unsupported flags */
76 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
77 :
78 : /*
79 : * create new MergeAppendState for our node
80 : */
81 550 : mergestate->ps.plan = (Plan *) node;
82 550 : mergestate->ps.state = estate;
83 550 : mergestate->ps.ExecProcNode = ExecMergeAppend;
84 :
85 : /* If run-time partition pruning is enabled, then set that up now */
86 550 : if (node->part_prune_index >= 0)
87 : {
88 : PartitionPruneState *prunestate;
89 :
90 : /*
91 : * Set up pruning data structure. This also initializes the set of
92 : * subplans to initialize (validsubplans) by taking into account the
93 : * result of performing initial pruning if any.
94 : */
95 66 : prunestate = ExecInitPartitionExecPruning(&mergestate->ps,
96 66 : list_length(node->mergeplans),
97 : node->part_prune_index,
98 : node->apprelids,
99 : &validsubplans);
100 66 : mergestate->ms_prune_state = prunestate;
101 66 : nplans = bms_num_members(validsubplans);
102 :
103 : /*
104 : * When no run-time pruning is required and there's at least one
105 : * subplan, we can fill ms_valid_subplans immediately, preventing
106 : * later calls to ExecFindMatchingSubPlans.
107 : */
108 66 : if (!prunestate->do_exec_prune && nplans > 0)
109 36 : mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
110 : }
111 : else
112 : {
113 484 : nplans = list_length(node->mergeplans);
114 :
115 : /*
116 : * When run-time partition pruning is not enabled we can just mark all
117 : * subplans as valid; they must also all be initialized.
118 : */
119 : Assert(nplans > 0);
120 484 : mergestate->ms_valid_subplans = validsubplans =
121 484 : bms_add_range(NULL, 0, nplans - 1);
122 484 : mergestate->ms_prune_state = NULL;
123 : }
124 :
125 550 : mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
126 550 : mergestate->mergeplans = mergeplanstates;
127 550 : mergestate->ms_nplans = nplans;
128 :
129 550 : mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
130 550 : mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
131 : mergestate);
132 :
133 : /*
134 : * call ExecInitNode on each of the valid plans to be executed and save
135 : * the results into the mergeplanstates array.
136 : */
137 550 : j = 0;
138 550 : i = -1;
139 2006 : while ((i = bms_next_member(validsubplans, i)) >= 0)
140 : {
141 1456 : Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
142 :
143 1456 : mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
144 : }
145 :
146 : /*
147 : * Initialize MergeAppend's result tuple type and slot. If the child
148 : * plans all produce the same fixed slot type, we can use that slot type;
149 : * otherwise make a virtual slot. (Note that the result slot itself is
150 : * used only to return a null tuple at end of execution; real tuples are
151 : * returned to the caller in the children's own result slots. What we are
152 : * doing here is allowing the parent plan node to optimize if the
153 : * MergeAppend will return only one kind of slot.)
154 : */
155 550 : mergeops = ExecGetCommonSlotOps(mergeplanstates, j);
156 550 : if (mergeops != NULL)
157 : {
158 414 : ExecInitResultTupleSlotTL(&mergestate->ps, mergeops);
159 : }
160 : else
161 : {
162 136 : ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
163 : /* show that the output slot type is not fixed */
164 136 : mergestate->ps.resultopsset = true;
165 136 : mergestate->ps.resultopsfixed = false;
166 : }
167 :
168 : /*
169 : * Miscellaneous initialization
170 : */
171 550 : mergestate->ps.ps_ProjInfo = NULL;
172 :
173 : /*
174 : * initialize sort-key information
175 : */
176 550 : mergestate->ms_nkeys = node->numCols;
177 550 : mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
178 :
179 1198 : for (i = 0; i < node->numCols; i++)
180 : {
181 648 : SortSupport sortKey = mergestate->ms_sortkeys + i;
182 :
183 648 : sortKey->ssup_cxt = CurrentMemoryContext;
184 648 : sortKey->ssup_collation = node->collations[i];
185 648 : sortKey->ssup_nulls_first = node->nullsFirst[i];
186 648 : sortKey->ssup_attno = node->sortColIdx[i];
187 :
188 : /*
189 : * It isn't feasible to perform abbreviated key conversion, since
190 : * tuples are pulled into mergestate's binary heap as needed. It
191 : * would likely be counter-productive to convert tuples into an
192 : * abbreviated representation as they're pulled up, so opt out of that
193 : * additional optimization entirely.
194 : */
195 648 : sortKey->abbreviate = false;
196 :
197 648 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
198 : }
199 :
200 : /*
201 : * initialize to show we have not run the subplans yet
202 : */
203 550 : mergestate->ms_initialized = false;
204 :
205 550 : return mergestate;
206 : }
207 :
208 : /* ----------------------------------------------------------------
209 : * ExecMergeAppend
210 : *
211 : * Handles iteration over multiple subplans.
212 : * ----------------------------------------------------------------
213 : */
214 : static TupleTableSlot *
215 163960 : ExecMergeAppend(PlanState *pstate)
216 : {
217 163960 : MergeAppendState *node = castNode(MergeAppendState, pstate);
218 : TupleTableSlot *result;
219 : SlotNumber i;
220 :
221 163960 : CHECK_FOR_INTERRUPTS();
222 :
223 163960 : if (!node->ms_initialized)
224 : {
225 : /* Nothing to do if all subplans were pruned */
226 274 : if (node->ms_nplans == 0)
227 18 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
228 :
229 : /*
230 : * If we've yet to determine the valid subplans then do so now. If
231 : * run-time pruning is disabled then the valid subplans will always be
232 : * set to all subplans.
233 : */
234 256 : if (node->ms_valid_subplans == NULL)
235 12 : node->ms_valid_subplans =
236 12 : ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
237 :
238 : /*
239 : * First time through: pull the first tuple from each valid subplan,
240 : * and set up the heap.
241 : */
242 256 : i = -1;
243 922 : while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
244 : {
245 666 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
246 666 : if (!TupIsNull(node->ms_slots[i]))
247 578 : binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
248 : }
249 256 : binaryheap_build(node->ms_heap);
250 256 : node->ms_initialized = true;
251 : }
252 : else
253 : {
254 : /*
255 : * Otherwise, pull the next tuple from whichever subplan we returned
256 : * from last time, and reinsert the subplan index into the heap,
257 : * because it might now compare differently against the existing
258 : * elements of the heap. (We could perhaps simplify the logic a bit
259 : * by doing this before returning from the prior call, but it's better
260 : * to not pull tuples until necessary.)
261 : */
262 163686 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
263 163686 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
264 163686 : if (!TupIsNull(node->ms_slots[i]))
265 163294 : binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
266 : else
267 392 : (void) binaryheap_remove_first(node->ms_heap);
268 : }
269 :
270 163942 : if (binaryheap_empty(node->ms_heap))
271 : {
272 : /* All the subplans are exhausted, and so is the heap */
273 190 : result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
274 : }
275 : else
276 : {
277 163752 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
278 163752 : result = node->ms_slots[i];
279 : }
280 :
281 163942 : return result;
282 : }
283 :
284 : /*
285 : * Compare the tuples in the two given slots.
286 : */
287 : static int32
288 149462 : heap_compare_slots(Datum a, Datum b, void *arg)
289 : {
290 149462 : MergeAppendState *node = (MergeAppendState *) arg;
291 149462 : SlotNumber slot1 = DatumGetInt32(a);
292 149462 : SlotNumber slot2 = DatumGetInt32(b);
293 :
294 149462 : TupleTableSlot *s1 = node->ms_slots[slot1];
295 149462 : TupleTableSlot *s2 = node->ms_slots[slot2];
296 : int nkey;
297 :
298 : Assert(!TupIsNull(s1));
299 : Assert(!TupIsNull(s2));
300 :
301 198314 : for (nkey = 0; nkey < node->ms_nkeys; nkey++)
302 : {
303 149462 : SortSupport sortKey = node->ms_sortkeys + nkey;
304 149462 : AttrNumber attno = sortKey->ssup_attno;
305 : Datum datum1,
306 : datum2;
307 : bool isNull1,
308 : isNull2;
309 : int compare;
310 :
311 149462 : datum1 = slot_getattr(s1, attno, &isNull1);
312 149462 : datum2 = slot_getattr(s2, attno, &isNull2);
313 :
314 149462 : compare = ApplySortComparator(datum1, isNull1,
315 : datum2, isNull2,
316 : sortKey);
317 149462 : if (compare != 0)
318 : {
319 100610 : INVERT_COMPARE_RESULT(compare);
320 100610 : return compare;
321 : }
322 : }
323 48852 : return 0;
324 : }
325 :
326 : /* ----------------------------------------------------------------
327 : * ExecEndMergeAppend
328 : *
329 : * Shuts down the subscans of the MergeAppend node.
330 : *
331 : * Returns nothing of interest.
332 : * ----------------------------------------------------------------
333 : */
334 : void
335 550 : ExecEndMergeAppend(MergeAppendState *node)
336 : {
337 : PlanState **mergeplans;
338 : int nplans;
339 : int i;
340 :
341 : /*
342 : * get information from the node
343 : */
344 550 : mergeplans = node->mergeplans;
345 550 : nplans = node->ms_nplans;
346 :
347 : /*
348 : * shut down each of the subscans
349 : */
350 2006 : for (i = 0; i < nplans; i++)
351 1456 : ExecEndNode(mergeplans[i]);
352 550 : }
353 :
354 : void
355 18 : ExecReScanMergeAppend(MergeAppendState *node)
356 : {
357 : int i;
358 :
359 : /*
360 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
361 : * we'd better unset the valid subplans so that they are reselected for
362 : * the new parameter values.
363 : */
364 18 : if (node->ms_prune_state &&
365 0 : bms_overlap(node->ps.chgParam,
366 0 : node->ms_prune_state->execparamids))
367 : {
368 0 : bms_free(node->ms_valid_subplans);
369 0 : node->ms_valid_subplans = NULL;
370 : }
371 :
372 54 : for (i = 0; i < node->ms_nplans; i++)
373 : {
374 36 : PlanState *subnode = node->mergeplans[i];
375 :
376 : /*
377 : * ExecReScan doesn't know about my subplans, so I have to do
378 : * changed-parameter signaling myself.
379 : */
380 36 : if (node->ps.chgParam != NULL)
381 36 : UpdateChangedParamSet(subnode, node->ps.chgParam);
382 :
383 : /*
384 : * If chgParam of subnode is not null then plan will be re-scanned by
385 : * first ExecProcNode.
386 : */
387 36 : if (subnode->chgParam == NULL)
388 0 : ExecReScan(subnode);
389 : }
390 18 : binaryheap_reset(node->ms_heap);
391 18 : node->ms_initialized = false;
392 18 : }
|