Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeMergeAppend.c
4 : * routines to handle MergeAppend nodes.
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeMergeAppend.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /* INTERFACE ROUTINES
16 : * ExecInitMergeAppend - initialize the MergeAppend node
17 : * ExecMergeAppend - retrieve the next tuple from the node
18 : * ExecEndMergeAppend - shut down the MergeAppend node
19 : * ExecReScanMergeAppend - rescan the MergeAppend node
20 : *
21 : * NOTES
22 : * A MergeAppend node contains a list of one or more subplans.
23 : * These are each expected to deliver tuples that are sorted according
24 : * to a common sort key. The MergeAppend node merges these streams
25 : * to produce output sorted the same way.
26 : *
27 : * MergeAppend nodes don't make use of their left and right
28 : * subtrees, rather they maintain a list of subplans so
29 : * a typical MergeAppend node looks like this in the plan tree:
30 : *
31 : * ...
32 : * /
33 : * MergeAppend---+------+------+--- nil
34 : * / \ | | |
35 : * nil nil ... ... ...
36 : * subplans
37 : */
38 :
39 : #include "postgres.h"
40 :
41 : #include "executor/executor.h"
42 : #include "executor/execPartition.h"
43 : #include "executor/nodeMergeAppend.h"
44 : #include "lib/binaryheap.h"
45 : #include "miscadmin.h"
46 :
47 : /*
48 : * We have one slot for each item in the heap array. We use SlotNumber
49 : * to store slot indexes. This doesn't actually provide any formal
50 : * type-safety, but it makes the code more self-documenting.
51 : */
52 : typedef int32 SlotNumber;
53 :
54 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55 : static int heap_compare_slots(Datum a, Datum b, void *arg);
56 :
57 :
58 : /* ----------------------------------------------------------------
59 : * ExecInitMergeAppend
60 : *
61 : * Begin all of the subscans of the MergeAppend node.
62 : * ----------------------------------------------------------------
63 : */
64 : MergeAppendState *
65 532 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 : {
67 532 : MergeAppendState *mergestate = makeNode(MergeAppendState);
68 : PlanState **mergeplanstates;
69 : Bitmapset *validsubplans;
70 : int nplans;
71 : int i,
72 : j;
73 :
74 : /* check for unsupported flags */
75 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76 :
77 : /*
78 : * create new MergeAppendState for our node
79 : */
80 532 : mergestate->ps.plan = (Plan *) node;
81 532 : mergestate->ps.state = estate;
82 532 : mergestate->ps.ExecProcNode = ExecMergeAppend;
83 :
84 : /* If run-time partition pruning is enabled, then set that up now */
85 532 : if (node->part_prune_info != NULL)
86 : {
87 : PartitionPruneState *prunestate;
88 :
89 : /*
90 : * Set up pruning data structure. This also initializes the set of
91 : * subplans to initialize (validsubplans) by taking into account the
92 : * result of performing initial pruning if any.
93 : */
94 54 : prunestate = ExecInitPartitionPruning(&mergestate->ps,
95 54 : list_length(node->mergeplans),
96 54 : node->part_prune_info,
97 : &validsubplans);
98 54 : mergestate->ms_prune_state = prunestate;
99 54 : nplans = bms_num_members(validsubplans);
100 :
101 : /*
102 : * When no run-time pruning is required and there's at least one
103 : * subplan, we can fill ms_valid_subplans immediately, preventing
104 : * later calls to ExecFindMatchingSubPlans.
105 : */
106 54 : if (!prunestate->do_exec_prune && nplans > 0)
107 24 : mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
108 : }
109 : else
110 : {
111 478 : nplans = list_length(node->mergeplans);
112 :
113 : /*
114 : * When run-time partition pruning is not enabled we can just mark all
115 : * subplans as valid; they must also all be initialized.
116 : */
117 : Assert(nplans > 0);
118 478 : mergestate->ms_valid_subplans = validsubplans =
119 478 : bms_add_range(NULL, 0, nplans - 1);
120 478 : mergestate->ms_prune_state = NULL;
121 : }
122 :
123 532 : mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
124 532 : mergestate->mergeplans = mergeplanstates;
125 532 : mergestate->ms_nplans = nplans;
126 :
127 532 : mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
128 532 : mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
129 : mergestate);
130 :
131 : /*
132 : * Miscellaneous initialization
133 : *
134 : * MergeAppend nodes do have Result slots, which hold pointers to tuples,
135 : * so we have to initialize them. FIXME
136 : */
137 532 : ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
138 :
139 : /* node returns slots from each of its subnodes, therefore not fixed */
140 532 : mergestate->ps.resultopsset = true;
141 532 : mergestate->ps.resultopsfixed = false;
142 :
143 : /*
144 : * call ExecInitNode on each of the valid plans to be executed and save
145 : * the results into the mergeplanstates array.
146 : */
147 532 : j = 0;
148 532 : i = -1;
149 1952 : while ((i = bms_next_member(validsubplans, i)) >= 0)
150 : {
151 1420 : Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
152 :
153 1420 : mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
154 : }
155 :
156 532 : mergestate->ps.ps_ProjInfo = NULL;
157 :
158 : /*
159 : * initialize sort-key information
160 : */
161 532 : mergestate->ms_nkeys = node->numCols;
162 532 : mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
163 :
164 1162 : for (i = 0; i < node->numCols; i++)
165 : {
166 630 : SortSupport sortKey = mergestate->ms_sortkeys + i;
167 :
168 630 : sortKey->ssup_cxt = CurrentMemoryContext;
169 630 : sortKey->ssup_collation = node->collations[i];
170 630 : sortKey->ssup_nulls_first = node->nullsFirst[i];
171 630 : sortKey->ssup_attno = node->sortColIdx[i];
172 :
173 : /*
174 : * It isn't feasible to perform abbreviated key conversion, since
175 : * tuples are pulled into mergestate's binary heap as needed. It
176 : * would likely be counter-productive to convert tuples into an
177 : * abbreviated representation as they're pulled up, so opt out of that
178 : * additional optimization entirely.
179 : */
180 630 : sortKey->abbreviate = false;
181 :
182 630 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
183 : }
184 :
185 : /*
186 : * initialize to show we have not run the subplans yet
187 : */
188 532 : mergestate->ms_initialized = false;
189 :
190 532 : return mergestate;
191 : }
192 :
193 : /* ----------------------------------------------------------------
194 : * ExecMergeAppend
195 : *
196 : * Handles iteration over multiple subplans.
197 : * ----------------------------------------------------------------
198 : */
199 : static TupleTableSlot *
200 163960 : ExecMergeAppend(PlanState *pstate)
201 : {
202 163960 : MergeAppendState *node = castNode(MergeAppendState, pstate);
203 : TupleTableSlot *result;
204 : SlotNumber i;
205 :
206 163960 : CHECK_FOR_INTERRUPTS();
207 :
208 163960 : if (!node->ms_initialized)
209 : {
210 : /* Nothing to do if all subplans were pruned */
211 274 : if (node->ms_nplans == 0)
212 18 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
213 :
214 : /*
215 : * If we've yet to determine the valid subplans then do so now. If
216 : * run-time pruning is disabled then the valid subplans will always be
217 : * set to all subplans.
218 : */
219 256 : if (node->ms_valid_subplans == NULL)
220 12 : node->ms_valid_subplans =
221 12 : ExecFindMatchingSubPlans(node->ms_prune_state, false);
222 :
223 : /*
224 : * First time through: pull the first tuple from each valid subplan,
225 : * and set up the heap.
226 : */
227 256 : i = -1;
228 922 : while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
229 : {
230 666 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
231 666 : if (!TupIsNull(node->ms_slots[i]))
232 578 : binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
233 : }
234 256 : binaryheap_build(node->ms_heap);
235 256 : node->ms_initialized = true;
236 : }
237 : else
238 : {
239 : /*
240 : * Otherwise, pull the next tuple from whichever subplan we returned
241 : * from last time, and reinsert the subplan index into the heap,
242 : * because it might now compare differently against the existing
243 : * elements of the heap. (We could perhaps simplify the logic a bit
244 : * by doing this before returning from the prior call, but it's better
245 : * to not pull tuples until necessary.)
246 : */
247 163686 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
248 163686 : node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
249 163686 : if (!TupIsNull(node->ms_slots[i]))
250 163294 : binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
251 : else
252 392 : (void) binaryheap_remove_first(node->ms_heap);
253 : }
254 :
255 163942 : if (binaryheap_empty(node->ms_heap))
256 : {
257 : /* All the subplans are exhausted, and so is the heap */
258 190 : result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
259 : }
260 : else
261 : {
262 163752 : i = DatumGetInt32(binaryheap_first(node->ms_heap));
263 163752 : result = node->ms_slots[i];
264 : }
265 :
266 163942 : return result;
267 : }
268 :
269 : /*
270 : * Compare the tuples in the two given slots.
271 : */
272 : static int32
273 149462 : heap_compare_slots(Datum a, Datum b, void *arg)
274 : {
275 149462 : MergeAppendState *node = (MergeAppendState *) arg;
276 149462 : SlotNumber slot1 = DatumGetInt32(a);
277 149462 : SlotNumber slot2 = DatumGetInt32(b);
278 :
279 149462 : TupleTableSlot *s1 = node->ms_slots[slot1];
280 149462 : TupleTableSlot *s2 = node->ms_slots[slot2];
281 : int nkey;
282 :
283 : Assert(!TupIsNull(s1));
284 : Assert(!TupIsNull(s2));
285 :
286 198314 : for (nkey = 0; nkey < node->ms_nkeys; nkey++)
287 : {
288 149462 : SortSupport sortKey = node->ms_sortkeys + nkey;
289 149462 : AttrNumber attno = sortKey->ssup_attno;
290 : Datum datum1,
291 : datum2;
292 : bool isNull1,
293 : isNull2;
294 : int compare;
295 :
296 149462 : datum1 = slot_getattr(s1, attno, &isNull1);
297 149462 : datum2 = slot_getattr(s2, attno, &isNull2);
298 :
299 149462 : compare = ApplySortComparator(datum1, isNull1,
300 : datum2, isNull2,
301 : sortKey);
302 149462 : if (compare != 0)
303 : {
304 100610 : INVERT_COMPARE_RESULT(compare);
305 100610 : return compare;
306 : }
307 : }
308 48852 : return 0;
309 : }
310 :
311 : /* ----------------------------------------------------------------
312 : * ExecEndMergeAppend
313 : *
314 : * Shuts down the subscans of the MergeAppend node.
315 : *
316 : * Returns nothing of interest.
317 : * ----------------------------------------------------------------
318 : */
319 : void
320 532 : ExecEndMergeAppend(MergeAppendState *node)
321 : {
322 : PlanState **mergeplans;
323 : int nplans;
324 : int i;
325 :
326 : /*
327 : * get information from the node
328 : */
329 532 : mergeplans = node->mergeplans;
330 532 : nplans = node->ms_nplans;
331 :
332 : /*
333 : * shut down each of the subscans
334 : */
335 1952 : for (i = 0; i < nplans; i++)
336 1420 : ExecEndNode(mergeplans[i]);
337 532 : }
338 :
339 : void
340 18 : ExecReScanMergeAppend(MergeAppendState *node)
341 : {
342 : int i;
343 :
344 : /*
345 : * If any PARAM_EXEC Params used in pruning expressions have changed, then
346 : * we'd better unset the valid subplans so that they are reselected for
347 : * the new parameter values.
348 : */
349 18 : if (node->ms_prune_state &&
350 0 : bms_overlap(node->ps.chgParam,
351 0 : node->ms_prune_state->execparamids))
352 : {
353 0 : bms_free(node->ms_valid_subplans);
354 0 : node->ms_valid_subplans = NULL;
355 : }
356 :
357 54 : for (i = 0; i < node->ms_nplans; i++)
358 : {
359 36 : PlanState *subnode = node->mergeplans[i];
360 :
361 : /*
362 : * ExecReScan doesn't know about my subplans, so I have to do
363 : * changed-parameter signaling myself.
364 : */
365 36 : if (node->ps.chgParam != NULL)
366 36 : UpdateChangedParamSet(subnode, node->ps.chgParam);
367 :
368 : /*
369 : * If chgParam of subnode is not null then plan will be re-scanned by
370 : * first ExecProcNode.
371 : */
372 36 : if (subnode->chgParam == NULL)
373 0 : ExecReScan(subnode);
374 : }
375 18 : binaryheap_reset(node->ms_heap);
376 18 : node->ms_initialized = false;
377 18 : }
|