Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMergeAppend.c
4 : * routines to handle MergeAppend nodes.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMergeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitMergeAppend - initialize the MergeAppend node
17 : * ExecMergeAppend - retrieve the next tuple from the node
18 : * ExecEndMergeAppend - shut down the MergeAppend node
19 : * ExecReScanMergeAppend - rescan the MergeAppend node
20 : *
21 : * NOTES
22 : * A MergeAppend node contains a list of one or more subplans.
23 : * These are each expected to deliver tuples that are sorted according
24 : * to a common sort key. The MergeAppend node merges these streams
25 : * to produce output sorted the same way.
26 : *
27 : * MergeAppend nodes don't make use of their left and right
28 : * subtrees, rather they maintain a list of subplans so
29 : * a typical MergeAppend node looks like this in the plan tree:
30 : *
31 : * ...
32 : * /
33 : * MergeAppend---+------+------+--- nil
34 : * / \ | | |
35 : * nil nil ... ... ...
36 : * subplans
37 : */
38 :
39 : #include "postgres.h"
40 :
41 : #include "executor/executor.h"
42 : #include "executor/execPartition.h"
43 : #include "executor/nodeMergeAppend.h"
44 : #include "lib/binaryheap.h"
45 : #include "miscadmin.h"
46 :
47 : /*
48 : * We have one slot for each item in the heap array. We use SlotNumber
49 : * to store slot indexes. This doesn't actually provide any formal
50 : * type-safety, but it makes the code more self-documenting.
51 : */
52 : typedef int32 SlotNumber;
53 :
54 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55 : static int heap_compare_slots(Datum a, Datum b, void *arg);
56 :
57 :
58 : /* ----------------------------------------------------------------
59 : * ExecInitMergeAppend
60 : *
61 : * Begin all of the subscans of the MergeAppend node.
62 : * ----------------------------------------------------------------
63 : */
64 : MergeAppendState *
65 538 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 : {
67 538 : MergeAppendState *mergestate = makeNode(MergeAppendState);
68 : PlanState **mergeplanstates;
69 : const TupleTableSlotOps *mergeops;
70 : Bitmapset *validsubplans;
71 : int nplans;
72 : int i,
73 : j;
74 :
75 : /* check for unsupported flags */
76 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
77 :
78 : /*
79 : * create new MergeAppendState for our node
80 : */
81 538 : mergestate->ps.plan = (Plan *) node;
82 538 : mergestate->ps.state = estate;
83 538 : mergestate->ps.ExecProcNode = ExecMergeAppend;
84 :
85 : /* If run-time partition pruning is enabled, then set that up now */
86 538 : if (node->part_prune_info != NULL)
87 : {
88 : PartitionPruneState *prunestate;
89 :
90 : /*
91 : * Set up pruning data structure. This also initializes the set of
92 : * subplans to initialize (validsubplans) by taking into account the
93 : * result of performing initial pruning if any.
94 : */
95 54 : prunestate = ExecInitPartitionPruning(&mergestate->ps,
96 54 : list_length(node->mergeplans),
97 54 : node->part_prune_info,
98 : &validsubplans);
99 54 : mergestate->ms_prune_state = prunestate;
100 54 : nplans = bms_num_members(validsubplans);
101 :
102 : /*
103 : * When no run-time pruning is required and there's at least one
104 : * subplan, we can fill ms_valid_subplans immediately, preventing
105 : * later calls to ExecFindMatchingSubPlans.
106 : */
107 54 : if (!prunestate->do_exec_prune && nplans > 0)
108 24 : mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
109 : }
110 : else
111 : {
112 484 : nplans = list_length(node->mergeplans);
113 :
114 : /*
115 : * When run-time partition pruning is not enabled we can just mark all
116 : * subplans as valid; they must also all be initialized.
117 : */
118 : Assert(nplans > 0);
119 484 : mergestate->ms_valid_subplans = validsubplans =
120 484 : bms_add_range(NULL, 0, nplans - 1);
121 484 : mergestate->ms_prune_state = NULL;
122 : }
123 :
124 538 : mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
125 538 : mergestate->mergeplans = mergeplanstates;
126 538 : mergestate->ms_nplans = nplans;
127 :
128 538 : mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
129 538 : mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
130 : mergestate);
131 :
132 : /*
133 : * call ExecInitNode on each of the valid plans to be executed and save
134 : * the results into the mergeplanstates array.
135 : */
136 538 : j = 0;
137 538 : i = -1;
138 1970 : while ((i = bms_next_member(validsubplans, i)) >= 0)
139 : {
140 1432 : Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
141 :
142 1432 : mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
143 : }
144 :
145 : /*
146 : * Initialize MergeAppend's result tuple type and slot. If the child
147 : * plans all produce the same fixed slot type, we can use that slot type;
148 : * otherwise make a virtual slot. (Note that the result slot itself is
149 : * used only to return a null tuple at end of execution; real tuples are
150 : * returned to the caller in the children's own result slots. What we are
151 : * doing here is allowing the parent plan node to optimize if the
152 : * MergeAppend will return only one kind of slot.)
153 : */
154 538 : mergeops = ExecGetCommonSlotOps(mergeplanstates, j);
155 538 : if (mergeops != NULL)
156 : {
157 402 : ExecInitResultTupleSlotTL(&mergestate->ps, mergeops);
158 : }
159 : else
160 : {
161 136 : ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
162 : /* show that the output slot type is not fixed */
163 136 : mergestate->ps.resultopsset = true;
164 136 : mergestate->ps.resultopsfixed = false;
165 : }
166 :
167 : /*
168 : * Miscellaneous initialization
169 : */
170 538 : mergestate->ps.ps_ProjInfo = NULL;
171 :
172 : /*
173 : * initialize sort-key information
174 : */
175 538 : mergestate->ms_nkeys = node->numCols;
176 538 : mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
177 :
178 1174 : for (i = 0; i < node->numCols; i++)
179 : {
180 636 : SortSupport sortKey = mergestate->ms_sortkeys + i;
181 :
182 636 : sortKey->ssup_cxt = CurrentMemoryContext;
183 636 : sortKey->ssup_collation = node->collations[i];
184 636 : sortKey->ssup_nulls_first = node->nullsFirst[i];
185 636 : sortKey->ssup_attno = node->sortColIdx[i];
186 :
187 : /*
188 : * It isn't feasible to perform abbreviated key conversion, since
189 : * tuples are pulled into mergestate's binary heap as needed. It
190 : * would likely be counter-productive to convert tuples into an
191 : * abbreviated representation as they're pulled up, so opt out of that
192 : * additional optimization entirely.
193 : */
194 636 : sortKey->abbreviate = false;
195 :
196 636 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
197 : }
198 :
199 : /*
200 : * initialize to show we have not run the subplans yet
201 : */
202 538 : mergestate->ms_initialized = false;
203 :
204 538 : return mergestate;
205 : }
206 :
207 : /* ----------------------------------------------------------------
208 : * ExecMergeAppend
209 : *
210 : * Handles iteration over multiple subplans.
211 : * ----------------------------------------------------------------
212 : */
213 : static TupleTableSlot *
214 163960 : ExecMergeAppend(PlanState *pstate)
215 : {
216 163960 : MergeAppendState *node = castNode(MergeAppendState, pstate);
217 : TupleTableSlot *result;
218 : SlotNumber i;
219 :
220 163960 : CHECK_FOR_INTERRUPTS();
221 :
222 163960 : if (!node->ms_initialized)
223 : {
224 : /* Nothing to do if all subplans were pruned */
225 274 : if (node->ms_nplans == 0)
226 18 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
227 :
228 : /*
229 : * If we've yet to determine the valid subplans then do so now. If
230 : * run-time pruning is disabled then the valid subplans will always be
231 : * set to all subplans.
232 : */
233 256 : if (node->ms_valid_subplans == NULL)
234 12 : node->ms_valid_subplans =
235 12 : ExecFindMatchingSubPlans(node->ms_prune_state, false);
236 :
237 : /*
238 : * First time through: pull the first tuple from each valid subplan,
239 : * and set up the heap.
240 : */
241 256 : i = -1;
242 922 : while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
243 : {
244 666 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
245 666 : if (!TupIsNull(node->ms_slots[i]))
246 578 : binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
247 : }
248 256 : binaryheap_build(node->ms_heap);
249 256 : node->ms_initialized = true;
250 : }
251 : else
252 : {
253 : /*
254 : * Otherwise, pull the next tuple from whichever subplan we returned
255 : * from last time, and reinsert the subplan index into the heap,
256 : * because it might now compare differently against the existing
257 : * elements of the heap. (We could perhaps simplify the logic a bit
258 : * by doing this before returning from the prior call, but it's better
259 : * to not pull tuples until necessary.)
260 : */
261 163686 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
262 163686 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
263 163686 : if (!TupIsNull(node->ms_slots[i]))
264 163294 : binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
265 : else
266 392 : (void) binaryheap_remove_first(node->ms_heap);
267 : }
268 :
269 163942 : if (binaryheap_empty(node->ms_heap))
270 : {
271 : /* All the subplans are exhausted, and so is the heap */
272 190 : result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
273 : }
274 : else
275 : {
276 163752 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
277 163752 : result = node->ms_slots[i];
278 : }
279 :
280 163942 : return result;
281 : }
282 :
283 : /*
284 : * Compare the tuples in the two given slots.
285 : */
286 : static int32
287 149462 : heap_compare_slots(Datum a, Datum b, void *arg)
288 : {
289 149462 : MergeAppendState *node = (MergeAppendState *) arg;
290 149462 : SlotNumber slot1 = DatumGetInt32(a);
291 149462 : SlotNumber slot2 = DatumGetInt32(b);
292 :
293 149462 : TupleTableSlot *s1 = node->ms_slots[slot1];
294 149462 : TupleTableSlot *s2 = node->ms_slots[slot2];
295 : int nkey;
296 :
297 : Assert(!TupIsNull(s1));
298 : Assert(!TupIsNull(s2));
299 :
300 198314 : for (nkey = 0; nkey < node->ms_nkeys; nkey++)
301 : {
302 149462 : SortSupport sortKey = node->ms_sortkeys + nkey;
303 149462 : AttrNumber attno = sortKey->ssup_attno;
304 : Datum datum1,
305 : datum2;
306 : bool isNull1,
307 : isNull2;
308 : int compare;
309 :
310 149462 : datum1 = slot_getattr(s1, attno, &isNull1);
311 149462 : datum2 = slot_getattr(s2, attno, &isNull2);
312 :
313 149462 : compare = ApplySortComparator(datum1, isNull1,
314 : datum2, isNull2,
315 : sortKey);
316 149462 : if (compare != 0)
317 : {
318 100610 : INVERT_COMPARE_RESULT(compare);
319 100610 : return compare;
320 : }
321 : }
322 48852 : return 0;
323 : }
324 :
325 : /* ----------------------------------------------------------------
326 : * ExecEndMergeAppend
327 : *
328 : * Shuts down the subscans of the MergeAppend node.
329 : *
330 : * Returns nothing of interest.
331 : * ----------------------------------------------------------------
332 : */
333 : void
334 538 : ExecEndMergeAppend(MergeAppendState *node)
335 : {
336 : PlanState **mergeplans;
337 : int nplans;
338 : int i;
339 :
340 : /*
341 : * get information from the node
342 : */
343 538 : mergeplans = node->mergeplans;
344 538 : nplans = node->ms_nplans;
345 :
346 : /*
347 : * shut down each of the subscans
348 : */
349 1970 : for (i = 0; i < nplans; i++)
350 1432 : ExecEndNode(mergeplans[i]);
351 538 : }
352 :
353 : void
354 18 : ExecReScanMergeAppend(MergeAppendState *node)
355 : {
356 : int i;
357 :
358 : /*
359 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
360 : * we'd better unset the valid subplans so that they are reselected for
361 : * the new parameter values.
362 : */
363 18 : if (node->ms_prune_state &&
364 0 : bms_overlap(node->ps.chgParam,
365 0 : node->ms_prune_state->execparamids))
366 : {
367 0 : bms_free(node->ms_valid_subplans);
368 0 : node->ms_valid_subplans = NULL;
369 : }
370 :
371 54 : for (i = 0; i < node->ms_nplans; i++)
372 : {
373 36 : PlanState *subnode = node->mergeplans[i];
374 :
375 : /*
376 : * ExecReScan doesn't know about my subplans, so I have to do
377 : * changed-parameter signaling myself.
378 : */
379 36 : if (node->ps.chgParam != NULL)
380 36 : UpdateChangedParamSet(subnode, node->ps.chgParam);
381 :
382 : /*
383 : * If chgParam of subnode is not null then plan will be re-scanned by
384 : * first ExecProcNode.
385 : */
386 36 : if (subnode->chgParam == NULL)
387 0 : ExecReScan(subnode);
388 : }
389 18 : binaryheap_reset(node->ms_heap);
390 18 : node->ms_initialized = false;
391 18 : }
|