Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMergeAppend.c
4 : * routines to handle MergeAppend nodes.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMergeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /*
16 : * INTERFACE ROUTINES
17 : * ExecInitMergeAppend - initialize the MergeAppend node
18 : * ExecMergeAppend - retrieve the next tuple from the node
19 : * ExecEndMergeAppend - shut down the MergeAppend node
20 : * ExecReScanMergeAppend - rescan the MergeAppend node
21 : *
22 : * NOTES
23 : * A MergeAppend node contains a list of one or more subplans.
24 : * These are each expected to deliver tuples that are sorted according
25 : * to a common sort key. The MergeAppend node merges these streams
26 : * to produce output sorted the same way.
27 : *
28 : * MergeAppend nodes don't make use of their left and right
29 : * subtrees, rather they maintain a list of subplans so
30 : * a typical MergeAppend node looks like this in the plan tree:
31 : *
32 : * ...
33 : * /
34 : * MergeAppend---+------+------+--- nil
35 : * / \ | | |
36 : * nil nil ... ... ...
37 : * subplans
38 : */
39 :
40 : #include "postgres.h"
41 :
42 : #include "executor/executor.h"
43 : #include "executor/execPartition.h"
44 : #include "executor/nodeMergeAppend.h"
45 : #include "lib/binaryheap.h"
46 : #include "miscadmin.h"
47 : #include "utils/sortsupport.h"
48 :
49 : /*
50 : * We have one slot for each item in the heap array. We use SlotNumber
51 : * to store slot indexes. This doesn't actually provide any formal
52 : * type-safety, but it makes the code more self-documenting.
53 : */
54 : typedef int32 SlotNumber;
55 :
56 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
57 : static int heap_compare_slots(Datum a, Datum b, void *arg);
58 :
59 :
60 : /* ----------------------------------------------------------------
61 : * ExecInitMergeAppend
62 : *
63 : * Begin all of the subscans of the MergeAppend node.
64 : * ----------------------------------------------------------------
65 : */
66 : MergeAppendState *
67 406 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
68 : {
69 406 : MergeAppendState *mergestate = makeNode(MergeAppendState);
70 : PlanState **mergeplanstates;
71 : const TupleTableSlotOps *mergeops;
72 : Bitmapset *validsubplans;
73 : int nplans;
74 : int i,
75 : j;
76 :
77 : /* check for unsupported flags */
78 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
79 :
80 : /*
81 : * create new MergeAppendState for our node
82 : */
83 406 : mergestate->ps.plan = (Plan *) node;
84 406 : mergestate->ps.state = estate;
85 406 : mergestate->ps.ExecProcNode = ExecMergeAppend;
86 :
87 : /* If run-time partition pruning is enabled, then set that up now */
88 406 : if (node->part_prune_index >= 0)
89 : {
90 : PartitionPruneState *prunestate;
91 :
92 : /*
93 : * Set up pruning data structure. This also initializes the set of
94 : * subplans to initialize (validsubplans) by taking into account the
95 : * result of performing initial pruning if any.
96 : */
97 44 : prunestate = ExecInitPartitionExecPruning(&mergestate->ps,
98 44 : list_length(node->mergeplans),
99 : node->part_prune_index,
100 : node->apprelids,
101 : &validsubplans);
102 44 : mergestate->ms_prune_state = prunestate;
103 44 : nplans = bms_num_members(validsubplans);
104 :
105 : /*
106 : * When no run-time pruning is required and there's at least one
107 : * subplan, we can fill ms_valid_subplans immediately, preventing
108 : * later calls to ExecFindMatchingSubPlans.
109 : */
110 44 : if (!prunestate->do_exec_prune && nplans > 0)
111 24 : mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
112 : }
113 : else
114 : {
115 362 : nplans = list_length(node->mergeplans);
116 :
117 : /*
118 : * When run-time partition pruning is not enabled we can just mark all
119 : * subplans as valid; they must also all be initialized.
120 : */
121 : Assert(nplans > 0);
122 362 : mergestate->ms_valid_subplans = validsubplans =
123 362 : bms_add_range(NULL, 0, nplans - 1);
124 362 : mergestate->ms_prune_state = NULL;
125 : }
126 :
127 406 : mergeplanstates = palloc_array(PlanState *, nplans);
128 406 : mergestate->mergeplans = mergeplanstates;
129 406 : mergestate->ms_nplans = nplans;
130 :
131 406 : mergestate->ms_slots = palloc0_array(TupleTableSlot *, nplans);
132 406 : mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
133 : mergestate);
134 :
135 : /*
136 : * call ExecInitNode on each of the valid plans to be executed and save
137 : * the results into the mergeplanstates array.
138 : */
139 406 : j = 0;
140 406 : i = -1;
141 1513 : while ((i = bms_next_member(validsubplans, i)) >= 0)
142 : {
143 1107 : Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
144 :
145 1107 : mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
146 : }
147 :
148 : /*
149 : * Initialize MergeAppend's result tuple type and slot. If the child
150 : * plans all produce the same fixed slot type, we can use that slot type;
151 : * otherwise make a virtual slot. (Note that the result slot itself is
152 : * used only to return a null tuple at end of execution; real tuples are
153 : * returned to the caller in the children's own result slots. What we are
154 : * doing here is allowing the parent plan node to optimize if the
155 : * MergeAppend will return only one kind of slot.)
156 : */
157 406 : mergeops = ExecGetCommonSlotOps(mergeplanstates, j);
158 406 : if (mergeops != NULL)
159 : {
160 305 : ExecInitResultTupleSlotTL(&mergestate->ps, mergeops);
161 : }
162 : else
163 : {
164 101 : ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
165 : /* show that the output slot type is not fixed */
166 101 : mergestate->ps.resultopsset = true;
167 101 : mergestate->ps.resultopsfixed = false;
168 : }
169 :
170 : /*
171 : * Miscellaneous initialization
172 : */
173 406 : mergestate->ps.ps_ProjInfo = NULL;
174 :
175 : /*
176 : * initialize sort-key information
177 : */
178 406 : mergestate->ms_nkeys = node->numCols;
179 406 : mergestate->ms_sortkeys = palloc0_array(SortSupportData, node->numCols);
180 :
181 881 : for (i = 0; i < node->numCols; i++)
182 : {
183 475 : SortSupport sortKey = mergestate->ms_sortkeys + i;
184 :
185 475 : sortKey->ssup_cxt = CurrentMemoryContext;
186 475 : sortKey->ssup_collation = node->collations[i];
187 475 : sortKey->ssup_nulls_first = node->nullsFirst[i];
188 475 : sortKey->ssup_attno = node->sortColIdx[i];
189 :
190 : /*
191 : * It isn't feasible to perform abbreviated key conversion, since
192 : * tuples are pulled into mergestate's binary heap as needed. It
193 : * would likely be counter-productive to convert tuples into an
194 : * abbreviated representation as they're pulled up, so opt out of that
195 : * additional optimization entirely.
196 : */
197 475 : sortKey->abbreviate = false;
198 :
199 475 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
200 : }
201 :
202 : /*
203 : * initialize to show we have not run the subplans yet
204 : */
205 406 : mergestate->ms_initialized = false;
206 :
207 406 : return mergestate;
208 : }
209 :
210 : /* ----------------------------------------------------------------
211 : * ExecMergeAppend
212 : *
213 : * Handles iteration over multiple subplans.
214 : * ----------------------------------------------------------------
215 : */
216 : static TupleTableSlot *
217 108033 : ExecMergeAppend(PlanState *pstate)
218 : {
219 108033 : MergeAppendState *node = castNode(MergeAppendState, pstate);
220 : TupleTableSlot *result;
221 : SlotNumber i;
222 :
223 108033 : CHECK_FOR_INTERRUPTS();
224 :
225 108033 : if (!node->ms_initialized)
226 : {
227 : /* Nothing to do if all subplans were pruned */
228 200 : if (node->ms_nplans == 0)
229 12 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
230 :
231 : /*
232 : * If we've yet to determine the valid subplans then do so now. If
233 : * run-time pruning is disabled then the valid subplans will always be
234 : * set to all subplans.
235 : */
236 188 : if (node->ms_valid_subplans == NULL)
237 8 : node->ms_valid_subplans =
238 8 : ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
239 :
240 : /*
241 : * First time through: pull the first tuple from each valid subplan,
242 : * and set up the heap.
243 : */
244 188 : i = -1;
245 709 : while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
246 : {
247 521 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
248 521 : if (!TupIsNull(node->ms_slots[i]))
249 417 : binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
250 : }
251 188 : binaryheap_build(node->ms_heap);
252 188 : node->ms_initialized = true;
253 : }
254 : else
255 : {
256 : /*
257 : * Otherwise, pull the next tuple from whichever subplan we returned
258 : * from last time, and reinsert the subplan index into the heap,
259 : * because it might now compare differently against the existing
260 : * elements of the heap. (We could perhaps simplify the logic a bit
261 : * by doing this before returning from the prior call, but it's better
262 : * to not pull tuples until necessary.)
263 : */
264 107833 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
265 107833 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
266 107833 : if (!TupIsNull(node->ms_slots[i]))
267 107552 : binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
268 : else
269 281 : (void) binaryheap_remove_first(node->ms_heap);
270 : }
271 :
272 108021 : if (binaryheap_empty(node->ms_heap))
273 : {
274 : /* All the subplans are exhausted, and so is the heap */
275 140 : result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
276 : }
277 : else
278 : {
279 107881 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
280 107881 : result = node->ms_slots[i];
281 : }
282 :
283 108021 : return result;
284 : }
285 :
286 : /*
287 : * Compare the tuples in the two given slots.
288 : */
289 : static int32
290 98312 : heap_compare_slots(Datum a, Datum b, void *arg)
291 : {
292 98312 : MergeAppendState *node = (MergeAppendState *) arg;
293 98312 : SlotNumber slot1 = DatumGetInt32(a);
294 98312 : SlotNumber slot2 = DatumGetInt32(b);
295 :
296 98312 : TupleTableSlot *s1 = node->ms_slots[slot1];
297 98312 : TupleTableSlot *s2 = node->ms_slots[slot2];
298 : int nkey;
299 :
300 : Assert(!TupIsNull(s1));
301 : Assert(!TupIsNull(s2));
302 :
303 130880 : for (nkey = 0; nkey < node->ms_nkeys; nkey++)
304 : {
305 98312 : SortSupport sortKey = node->ms_sortkeys + nkey;
306 98312 : AttrNumber attno = sortKey->ssup_attno;
307 : Datum datum1,
308 : datum2;
309 : bool isNull1,
310 : isNull2;
311 : int compare;
312 :
313 98312 : datum1 = slot_getattr(s1, attno, &isNull1);
314 98312 : datum2 = slot_getattr(s2, attno, &isNull2);
315 :
316 98312 : compare = ApplySortComparator(datum1, isNull1,
317 : datum2, isNull2,
318 : sortKey);
319 98312 : if (compare != 0)
320 : {
321 65744 : INVERT_COMPARE_RESULT(compare);
322 65744 : return compare;
323 : }
324 : }
325 32568 : return 0;
326 : }
327 :
328 : /* ----------------------------------------------------------------
329 : * ExecEndMergeAppend
330 : *
331 : * Shuts down the subscans of the MergeAppend node.
332 : *
333 : * Returns nothing of interest.
334 : * ----------------------------------------------------------------
335 : */
336 : void
337 406 : ExecEndMergeAppend(MergeAppendState *node)
338 : {
339 : PlanState **mergeplans;
340 : int nplans;
341 : int i;
342 :
343 : /*
344 : * get information from the node
345 : */
346 406 : mergeplans = node->mergeplans;
347 406 : nplans = node->ms_nplans;
348 :
349 : /*
350 : * shut down each of the subscans
351 : */
352 1513 : for (i = 0; i < nplans; i++)
353 1107 : ExecEndNode(mergeplans[i]);
354 406 : }
355 :
356 : void
357 12 : ExecReScanMergeAppend(MergeAppendState *node)
358 : {
359 : int i;
360 :
361 : /*
362 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
363 : * we'd better unset the valid subplans so that they are reselected for
364 : * the new parameter values.
365 : */
366 12 : if (node->ms_prune_state &&
367 0 : bms_overlap(node->ps.chgParam,
368 0 : node->ms_prune_state->execparamids))
369 : {
370 0 : bms_free(node->ms_valid_subplans);
371 0 : node->ms_valid_subplans = NULL;
372 : }
373 :
374 36 : for (i = 0; i < node->ms_nplans; i++)
375 : {
376 24 : PlanState *subnode = node->mergeplans[i];
377 :
378 : /*
379 : * ExecReScan doesn't know about my subplans, so I have to do
380 : * changed-parameter signaling myself.
381 : */
382 24 : if (node->ps.chgParam != NULL)
383 24 : UpdateChangedParamSet(subnode, node->ps.chgParam);
384 :
385 : /*
386 : * If chgParam of subnode is not null then plan will be re-scanned by
387 : * first ExecProcNode.
388 : */
389 24 : if (subnode->chgParam == NULL)
390 0 : ExecReScan(subnode);
391 : }
392 12 : binaryheap_reset(node->ms_heap);
393 12 : node->ms_initialized = false;
394 12 : }
|