Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMergeAppend.c
4 : * routines to handle MergeAppend nodes.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMergeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitMergeAppend - initialize the MergeAppend node
17 : * ExecMergeAppend - retrieve the next tuple from the node
18 : * ExecEndMergeAppend - shut down the MergeAppend node
19 : * ExecReScanMergeAppend - rescan the MergeAppend node
20 : *
21 : * NOTES
22 : * A MergeAppend node contains a list of one or more subplans.
23 : * These are each expected to deliver tuples that are sorted according
24 : * to a common sort key. The MergeAppend node merges these streams
25 : * to produce output sorted the same way.
26 : *
27 : * MergeAppend nodes don't make use of their left and right
28 : * subtrees, rather they maintain a list of subplans so
29 : * a typical MergeAppend node looks like this in the plan tree:
30 : *
31 : * ...
32 : * /
33 : * MergeAppend---+------+------+--- nil
34 : * / \ | | |
35 : * nil nil ... ... ...
36 : * subplans
37 : */
38 :
39 : #include "postgres.h"
40 :
41 : #include "executor/executor.h"
42 : #include "executor/execPartition.h"
43 : #include "executor/nodeMergeAppend.h"
44 : #include "lib/binaryheap.h"
45 : #include "miscadmin.h"
46 : #include "utils/sortsupport.h"
47 :
48 : /*
49 : * We have one slot for each item in the heap array. We use SlotNumber
50 : * to store slot indexes. This doesn't actually provide any formal
51 : * type-safety, but it makes the code more self-documenting.
52 : */
53 : typedef int32 SlotNumber;
54 :
55 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
56 : static int heap_compare_slots(Datum a, Datum b, void *arg);
57 :
58 :
59 : /* ----------------------------------------------------------------
60 : * ExecInitMergeAppend
61 : *
62 : * Begin all of the subscans of the MergeAppend node.
63 : * ----------------------------------------------------------------
64 : */
65 : MergeAppendState *
66 406 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
67 : {
68 406 : MergeAppendState *mergestate = makeNode(MergeAppendState);
69 : PlanState **mergeplanstates;
70 : const TupleTableSlotOps *mergeops;
71 : Bitmapset *validsubplans;
72 : int nplans;
73 : int i,
74 : j;
75 :
76 : /* check for unsupported flags */
77 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
78 :
79 : /*
80 : * create new MergeAppendState for our node
81 : */
82 406 : mergestate->ps.plan = (Plan *) node;
83 406 : mergestate->ps.state = estate;
84 406 : mergestate->ps.ExecProcNode = ExecMergeAppend;
85 :
86 : /* If run-time partition pruning is enabled, then set that up now */
87 406 : if (node->part_prune_index >= 0)
88 : {
89 : PartitionPruneState *prunestate;
90 :
91 : /*
92 : * Set up pruning data structure. This also initializes the set of
93 : * subplans to initialize (validsubplans) by taking into account the
94 : * result of performing initial pruning if any.
95 : */
96 44 : prunestate = ExecInitPartitionExecPruning(&mergestate->ps,
97 44 : list_length(node->mergeplans),
98 : node->part_prune_index,
99 : node->apprelids,
100 : &validsubplans);
101 44 : mergestate->ms_prune_state = prunestate;
102 44 : nplans = bms_num_members(validsubplans);
103 :
104 : /*
105 : * When no run-time pruning is required and there's at least one
106 : * subplan, we can fill ms_valid_subplans immediately, preventing
107 : * later calls to ExecFindMatchingSubPlans.
108 : */
109 44 : if (!prunestate->do_exec_prune && nplans > 0)
110 24 : mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
111 : }
112 : else
113 : {
114 362 : nplans = list_length(node->mergeplans);
115 :
116 : /*
117 : * When run-time partition pruning is not enabled we can just mark all
118 : * subplans as valid; they must also all be initialized.
119 : */
120 : Assert(nplans > 0);
121 362 : mergestate->ms_valid_subplans = validsubplans =
122 362 : bms_add_range(NULL, 0, nplans - 1);
123 362 : mergestate->ms_prune_state = NULL;
124 : }
125 :
126 406 : mergeplanstates = palloc_array(PlanState *, nplans);
127 406 : mergestate->mergeplans = mergeplanstates;
128 406 : mergestate->ms_nplans = nplans;
129 :
130 406 : mergestate->ms_slots = palloc0_array(TupleTableSlot *, nplans);
131 406 : mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
132 : mergestate);
133 :
134 : /*
135 : * call ExecInitNode on each of the valid plans to be executed and save
136 : * the results into the mergeplanstates array.
137 : */
138 406 : j = 0;
139 406 : i = -1;
140 1513 : while ((i = bms_next_member(validsubplans, i)) >= 0)
141 : {
142 1107 : Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
143 :
144 1107 : mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
145 : }
146 :
147 : /*
148 : * Initialize MergeAppend's result tuple type and slot. If the child
149 : * plans all produce the same fixed slot type, we can use that slot type;
150 : * otherwise make a virtual slot. (Note that the result slot itself is
151 : * used only to return a null tuple at end of execution; real tuples are
152 : * returned to the caller in the children's own result slots. What we are
153 : * doing here is allowing the parent plan node to optimize if the
154 : * MergeAppend will return only one kind of slot.)
155 : */
156 406 : mergeops = ExecGetCommonSlotOps(mergeplanstates, j);
157 406 : if (mergeops != NULL)
158 : {
159 305 : ExecInitResultTupleSlotTL(&mergestate->ps, mergeops);
160 : }
161 : else
162 : {
163 101 : ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
164 : /* show that the output slot type is not fixed */
165 101 : mergestate->ps.resultopsset = true;
166 101 : mergestate->ps.resultopsfixed = false;
167 : }
168 :
169 : /*
170 : * Miscellaneous initialization
171 : */
172 406 : mergestate->ps.ps_ProjInfo = NULL;
173 :
174 : /*
175 : * initialize sort-key information
176 : */
177 406 : mergestate->ms_nkeys = node->numCols;
178 406 : mergestate->ms_sortkeys = palloc0_array(SortSupportData, node->numCols);
179 :
180 881 : for (i = 0; i < node->numCols; i++)
181 : {
182 475 : SortSupport sortKey = mergestate->ms_sortkeys + i;
183 :
184 475 : sortKey->ssup_cxt = CurrentMemoryContext;
185 475 : sortKey->ssup_collation = node->collations[i];
186 475 : sortKey->ssup_nulls_first = node->nullsFirst[i];
187 475 : sortKey->ssup_attno = node->sortColIdx[i];
188 :
189 : /*
190 : * It isn't feasible to perform abbreviated key conversion, since
191 : * tuples are pulled into mergestate's binary heap as needed. It
192 : * would likely be counter-productive to convert tuples into an
193 : * abbreviated representation as they're pulled up, so opt out of that
194 : * additional optimization entirely.
195 : */
196 475 : sortKey->abbreviate = false;
197 :
198 475 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
199 : }
200 :
201 : /*
202 : * initialize to show we have not run the subplans yet
203 : */
204 406 : mergestate->ms_initialized = false;
205 :
206 406 : return mergestate;
207 : }
208 :
209 : /* ----------------------------------------------------------------
210 : * ExecMergeAppend
211 : *
212 : * Handles iteration over multiple subplans.
213 : * ----------------------------------------------------------------
214 : */
215 : static TupleTableSlot *
216 108033 : ExecMergeAppend(PlanState *pstate)
217 : {
218 108033 : MergeAppendState *node = castNode(MergeAppendState, pstate);
219 : TupleTableSlot *result;
220 : SlotNumber i;
221 :
222 108033 : CHECK_FOR_INTERRUPTS();
223 :
224 108033 : if (!node->ms_initialized)
225 : {
226 : /* Nothing to do if all subplans were pruned */
227 200 : if (node->ms_nplans == 0)
228 12 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
229 :
230 : /*
231 : * If we've yet to determine the valid subplans then do so now. If
232 : * run-time pruning is disabled then the valid subplans will always be
233 : * set to all subplans.
234 : */
235 188 : if (node->ms_valid_subplans == NULL)
236 8 : node->ms_valid_subplans =
237 8 : ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
238 :
239 : /*
240 : * First time through: pull the first tuple from each valid subplan,
241 : * and set up the heap.
242 : */
243 188 : i = -1;
244 709 : while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
245 : {
246 521 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
247 521 : if (!TupIsNull(node->ms_slots[i]))
248 417 : binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
249 : }
250 188 : binaryheap_build(node->ms_heap);
251 188 : node->ms_initialized = true;
252 : }
253 : else
254 : {
255 : /*
256 : * Otherwise, pull the next tuple from whichever subplan we returned
257 : * from last time, and reinsert the subplan index into the heap,
258 : * because it might now compare differently against the existing
259 : * elements of the heap. (We could perhaps simplify the logic a bit
260 : * by doing this before returning from the prior call, but it's better
261 : * to not pull tuples until necessary.)
262 : */
263 107833 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
264 107833 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
265 107833 : if (!TupIsNull(node->ms_slots[i]))
266 107552 : binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
267 : else
268 281 : (void) binaryheap_remove_first(node->ms_heap);
269 : }
270 :
271 108021 : if (binaryheap_empty(node->ms_heap))
272 : {
273 : /* All the subplans are exhausted, and so is the heap */
274 140 : result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
275 : }
276 : else
277 : {
278 107881 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
279 107881 : result = node->ms_slots[i];
280 : }
281 :
282 108021 : return result;
283 : }
284 :
285 : /*
286 : * Compare the tuples in the two given slots.
287 : */
288 : static int32
289 98312 : heap_compare_slots(Datum a, Datum b, void *arg)
290 : {
291 98312 : MergeAppendState *node = (MergeAppendState *) arg;
292 98312 : SlotNumber slot1 = DatumGetInt32(a);
293 98312 : SlotNumber slot2 = DatumGetInt32(b);
294 :
295 98312 : TupleTableSlot *s1 = node->ms_slots[slot1];
296 98312 : TupleTableSlot *s2 = node->ms_slots[slot2];
297 : int nkey;
298 :
299 : Assert(!TupIsNull(s1));
300 : Assert(!TupIsNull(s2));
301 :
302 130880 : for (nkey = 0; nkey < node->ms_nkeys; nkey++)
303 : {
304 98312 : SortSupport sortKey = node->ms_sortkeys + nkey;
305 98312 : AttrNumber attno = sortKey->ssup_attno;
306 : Datum datum1,
307 : datum2;
308 : bool isNull1,
309 : isNull2;
310 : int compare;
311 :
312 98312 : datum1 = slot_getattr(s1, attno, &isNull1);
313 98312 : datum2 = slot_getattr(s2, attno, &isNull2);
314 :
315 98312 : compare = ApplySortComparator(datum1, isNull1,
316 : datum2, isNull2,
317 : sortKey);
318 98312 : if (compare != 0)
319 : {
320 65744 : INVERT_COMPARE_RESULT(compare);
321 65744 : return compare;
322 : }
323 : }
324 32568 : return 0;
325 : }
326 :
327 : /* ----------------------------------------------------------------
328 : * ExecEndMergeAppend
329 : *
330 : * Shuts down the subscans of the MergeAppend node.
331 : *
332 : * Returns nothing of interest.
333 : * ----------------------------------------------------------------
334 : */
335 : void
336 406 : ExecEndMergeAppend(MergeAppendState *node)
337 : {
338 : PlanState **mergeplans;
339 : int nplans;
340 : int i;
341 :
342 : /*
343 : * get information from the node
344 : */
345 406 : mergeplans = node->mergeplans;
346 406 : nplans = node->ms_nplans;
347 :
348 : /*
349 : * shut down each of the subscans
350 : */
351 1513 : for (i = 0; i < nplans; i++)
352 1107 : ExecEndNode(mergeplans[i]);
353 406 : }
354 :
355 : void
356 12 : ExecReScanMergeAppend(MergeAppendState *node)
357 : {
358 : int i;
359 :
360 : /*
361 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
362 : * we'd better unset the valid subplans so that they are reselected for
363 : * the new parameter values.
364 : */
365 12 : if (node->ms_prune_state &&
366 0 : bms_overlap(node->ps.chgParam,
367 0 : node->ms_prune_state->execparamids))
368 : {
369 0 : bms_free(node->ms_valid_subplans);
370 0 : node->ms_valid_subplans = NULL;
371 : }
372 :
373 36 : for (i = 0; i < node->ms_nplans; i++)
374 : {
375 24 : PlanState *subnode = node->mergeplans[i];
376 :
377 : /*
378 : * ExecReScan doesn't know about my subplans, so I have to do
379 : * changed-parameter signaling myself.
380 : */
381 24 : if (node->ps.chgParam != NULL)
382 24 : UpdateChangedParamSet(subnode, node->ps.chgParam);
383 :
384 : /*
385 : * If chgParam of subnode is not null then plan will be re-scanned by
386 : * first ExecProcNode.
387 : */
388 24 : if (subnode->chgParam == NULL)
389 0 : ExecReScan(subnode);
390 : }
391 12 : binaryheap_reset(node->ms_heap);
392 12 : node->ms_initialized = false;
393 12 : }
|