Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGatherMerge.c
4 : * Scan a plan in multiple workers, and do order-preserving merge.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/nodeGatherMerge.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "executor/executor.h"
19 : #include "executor/execParallel.h"
20 : #include "executor/nodeGatherMerge.h"
21 : #include "executor/tqueue.h"
22 : #include "lib/binaryheap.h"
23 : #include "miscadmin.h"
24 : #include "optimizer/optimizer.h"
25 :
26 : /*
27 : * When we read tuples from workers, it's a good idea to read several at once
28 : * for efficiency when possible: this minimizes context-switching overhead.
29 : * But reading too many at a time wastes memory without improving performance.
30 : * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one).
31 : */
32 : #define MAX_TUPLE_STORE 10
33 :
34 : /*
35 : * Pending-tuple array for each worker. This holds additional tuples that
36 : * we were able to fetch from the worker, but can't process yet. In addition,
37 : * this struct holds the "done" flag indicating the worker is known to have
38 : * no more tuples. (We do not use this struct for the leader; we don't keep
39 : * any pending tuples for the leader, and the need_to_scan_locally flag serves
40 : * as its "done" indicator.)
41 : */
42 : typedef struct GMReaderTupleBuffer
43 : {
44 : MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
45 : int nTuples; /* number of tuples currently stored */
46 : int readCounter; /* index of next tuple to extract */
47 : bool done; /* true if reader is known exhausted */
48 : } GMReaderTupleBuffer;
49 :
50 : static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
51 : static int32 heap_compare_slots(Datum a, Datum b, void *arg);
52 : static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
53 : static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
54 : bool nowait, bool *done);
55 : static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
56 : static void gather_merge_setup(GatherMergeState *gm_state);
57 : static void gather_merge_init(GatherMergeState *gm_state);
58 : static void gather_merge_clear_tuples(GatherMergeState *gm_state);
59 : static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
60 : bool nowait);
61 : static void load_tuple_array(GatherMergeState *gm_state, int reader);
62 :
63 : /* ----------------------------------------------------------------
64 : * ExecInitGather
65 : * ----------------------------------------------------------------
66 : */
67 : GatherMergeState *
68 354 : ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
69 : {
70 : GatherMergeState *gm_state;
71 : Plan *outerNode;
72 : TupleDesc tupDesc;
73 :
74 : /* Gather merge node doesn't have innerPlan node. */
75 : Assert(innerPlan(node) == NULL);
76 :
77 : /*
78 : * create state structure
79 : */
80 354 : gm_state = makeNode(GatherMergeState);
81 354 : gm_state->ps.plan = (Plan *) node;
82 354 : gm_state->ps.state = estate;
83 354 : gm_state->ps.ExecProcNode = ExecGatherMerge;
84 :
85 354 : gm_state->initialized = false;
86 354 : gm_state->gm_initialized = false;
87 354 : gm_state->tuples_needed = -1;
88 :
89 : /*
90 : * Miscellaneous initialization
91 : *
92 : * create expression context for node
93 : */
94 354 : ExecAssignExprContext(estate, &gm_state->ps);
95 :
96 : /*
97 : * GatherMerge doesn't support checking a qual (it's always more efficient
98 : * to do it in the child node).
99 : */
100 : Assert(!node->plan.qual);
101 :
102 : /*
103 : * now initialize outer plan
104 : */
105 354 : outerNode = outerPlan(node);
106 354 : outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags);
107 :
108 : /*
109 : * Leader may access ExecProcNode result directly (if
110 : * need_to_scan_locally), or from workers via tuple queue. So we can't
111 : * trivially rely on the slot type being fixed for expressions evaluated
112 : * within this node.
113 : */
114 354 : gm_state->ps.outeropsset = true;
115 354 : gm_state->ps.outeropsfixed = false;
116 :
117 : /*
118 : * Store the tuple descriptor into gather merge state, so we can use it
119 : * while initializing the gather merge slots.
120 : */
121 354 : tupDesc = ExecGetResultType(outerPlanState(gm_state));
122 354 : gm_state->tupDesc = tupDesc;
123 :
124 : /*
125 : * Initialize result type and projection.
126 : */
127 354 : ExecInitResultTypeTL(&gm_state->ps);
128 354 : ExecConditionalAssignProjectionInfo(&gm_state->ps, tupDesc, OUTER_VAR);
129 :
130 : /*
131 : * Without projections result slot type is not trivially known, see
132 : * comment above.
133 : */
134 354 : if (gm_state->ps.ps_ProjInfo == NULL)
135 : {
136 342 : gm_state->ps.resultopsset = true;
137 342 : gm_state->ps.resultopsfixed = false;
138 : }
139 :
140 : /*
141 : * initialize sort-key information
142 : */
143 354 : if (node->numCols)
144 : {
145 : int i;
146 :
147 354 : gm_state->gm_nkeys = node->numCols;
148 354 : gm_state->gm_sortkeys = palloc0_array(SortSupportData, node->numCols);
149 :
150 828 : for (i = 0; i < node->numCols; i++)
151 : {
152 474 : SortSupport sortKey = gm_state->gm_sortkeys + i;
153 :
154 474 : sortKey->ssup_cxt = CurrentMemoryContext;
155 474 : sortKey->ssup_collation = node->collations[i];
156 474 : sortKey->ssup_nulls_first = node->nullsFirst[i];
157 474 : sortKey->ssup_attno = node->sortColIdx[i];
158 :
159 : /*
160 : * We don't perform abbreviated key conversion here, for the same
161 : * reasons that it isn't used in MergeAppend
162 : */
163 474 : sortKey->abbreviate = false;
164 :
165 474 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
166 : }
167 : }
168 :
169 : /* Now allocate the workspace for gather merge */
170 354 : gather_merge_setup(gm_state);
171 :
172 354 : return gm_state;
173 : }
174 :
175 : /* ----------------------------------------------------------------
176 : * ExecGatherMerge(node)
177 : *
178 : * Scans the relation via multiple workers and returns
179 : * the next qualifying tuple.
180 : * ----------------------------------------------------------------
181 : */
182 : static TupleTableSlot *
183 254828 : ExecGatherMerge(PlanState *pstate)
184 : {
185 254828 : GatherMergeState *node = castNode(GatherMergeState, pstate);
186 : TupleTableSlot *slot;
187 : ExprContext *econtext;
188 :
189 254828 : CHECK_FOR_INTERRUPTS();
190 :
191 : /*
192 : * As with Gather, we don't launch workers until this node is actually
193 : * executed.
194 : */
195 254828 : if (!node->initialized)
196 : {
197 168 : EState *estate = node->ps.state;
198 168 : GatherMerge *gm = castNode(GatherMerge, node->ps.plan);
199 :
200 : /*
201 : * Sometimes we might have to run without parallelism; but if parallel
202 : * mode is active then we can try to fire up some workers.
203 : */
204 168 : if (gm->num_workers > 0 && estate->es_use_parallel_mode)
205 : {
206 : ParallelContext *pcxt;
207 :
208 : /* Initialize, or re-initialize, shared state needed by workers. */
209 168 : if (!node->pei)
210 138 : node->pei = ExecInitParallelPlan(outerPlanState(node),
211 : estate,
212 : gm->initParam,
213 : gm->num_workers,
214 : node->tuples_needed);
215 : else
216 30 : ExecParallelReinitialize(outerPlanState(node),
217 30 : node->pei,
218 : gm->initParam);
219 :
220 : /* Try to launch workers. */
221 168 : pcxt = node->pei->pcxt;
222 168 : LaunchParallelWorkers(pcxt);
223 : /* We save # workers launched for the benefit of EXPLAIN */
224 168 : node->nworkers_launched = pcxt->nworkers_launched;
225 :
226 : /*
227 : * Count number of workers originally wanted and actually
228 : * launched.
229 : */
230 168 : estate->es_parallel_workers_to_launch += pcxt->nworkers_to_launch;
231 168 : estate->es_parallel_workers_launched += pcxt->nworkers_launched;
232 :
233 : /* Set up tuple queue readers to read the results. */
234 168 : if (pcxt->nworkers_launched > 0)
235 : {
236 156 : ExecParallelCreateReaders(node->pei);
237 : /* Make a working array showing the active readers */
238 156 : node->nreaders = pcxt->nworkers_launched;
239 156 : node->reader = (TupleQueueReader **)
240 156 : palloc(node->nreaders * sizeof(TupleQueueReader *));
241 156 : memcpy(node->reader, node->pei->reader,
242 156 : node->nreaders * sizeof(TupleQueueReader *));
243 : }
244 : else
245 : {
246 : /* No workers? Then never mind. */
247 12 : node->nreaders = 0;
248 12 : node->reader = NULL;
249 : }
250 : }
251 :
252 : /* allow leader to participate if enabled or no choice */
253 168 : if (parallel_leader_participation || node->nreaders == 0)
254 162 : node->need_to_scan_locally = true;
255 168 : node->initialized = true;
256 : }
257 :
258 : /*
259 : * Reset per-tuple memory context to free any expression evaluation
260 : * storage allocated in the previous tuple cycle.
261 : */
262 254828 : econtext = node->ps.ps_ExprContext;
263 254828 : ResetExprContext(econtext);
264 :
265 : /*
266 : * Get next tuple, either from one of our workers, or by running the plan
267 : * ourselves.
268 : */
269 254828 : slot = gather_merge_getnext(node);
270 254828 : if (TupIsNull(slot))
271 132 : return NULL;
272 :
273 : /* If no projection is required, we're done. */
274 254696 : if (node->ps.ps_ProjInfo == NULL)
275 254696 : return slot;
276 :
277 : /*
278 : * Form the result tuple using ExecProject(), and return it.
279 : */
280 0 : econtext->ecxt_outertuple = slot;
281 0 : return ExecProject(node->ps.ps_ProjInfo);
282 : }
283 :
284 : /* ----------------------------------------------------------------
285 : * ExecEndGatherMerge
286 : *
287 : * frees any storage allocated through C routines.
288 : * ----------------------------------------------------------------
289 : */
290 : void
291 354 : ExecEndGatherMerge(GatherMergeState *node)
292 : {
293 354 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
294 354 : ExecShutdownGatherMerge(node);
295 354 : }
296 :
297 : /* ----------------------------------------------------------------
298 : * ExecShutdownGatherMerge
299 : *
300 : * Destroy the setup for parallel workers including parallel context.
301 : * ----------------------------------------------------------------
302 : */
303 : void
304 492 : ExecShutdownGatherMerge(GatherMergeState *node)
305 : {
306 492 : ExecShutdownGatherMergeWorkers(node);
307 :
308 : /* Now destroy the parallel context. */
309 492 : if (node->pei != NULL)
310 : {
311 138 : ExecParallelCleanup(node->pei);
312 138 : node->pei = NULL;
313 : }
314 492 : }
315 :
316 : /* ----------------------------------------------------------------
317 : * ExecShutdownGatherMergeWorkers
318 : *
319 : * Stop all the parallel workers.
320 : * ----------------------------------------------------------------
321 : */
322 : static void
323 540 : ExecShutdownGatherMergeWorkers(GatherMergeState *node)
324 : {
325 540 : if (node->pei != NULL)
326 168 : ExecParallelFinish(node->pei);
327 :
328 : /* Flush local copy of reader array */
329 540 : if (node->reader)
330 156 : pfree(node->reader);
331 540 : node->reader = NULL;
332 540 : }
333 :
334 : /* ----------------------------------------------------------------
335 : * ExecReScanGatherMerge
336 : *
337 : * Prepare to re-scan the result of a GatherMerge.
338 : * ----------------------------------------------------------------
339 : */
340 : void
341 48 : ExecReScanGatherMerge(GatherMergeState *node)
342 : {
343 48 : GatherMerge *gm = (GatherMerge *) node->ps.plan;
344 48 : PlanState *outerPlan = outerPlanState(node);
345 :
346 : /* Make sure any existing workers are gracefully shut down */
347 48 : ExecShutdownGatherMergeWorkers(node);
348 :
349 : /* Free any unused tuples, so we don't leak memory across rescans */
350 48 : gather_merge_clear_tuples(node);
351 :
352 : /* Mark node so that shared state will be rebuilt at next call */
353 48 : node->initialized = false;
354 48 : node->gm_initialized = false;
355 :
356 : /*
357 : * Set child node's chgParam to tell it that the next scan might deliver a
358 : * different set of rows within the leader process. (The overall rowset
359 : * shouldn't change, but the leader process's subset might; hence nodes
360 : * between here and the parallel table scan node mustn't optimize on the
361 : * assumption of an unchanging rowset.)
362 : */
363 48 : if (gm->rescan_param >= 0)
364 48 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
365 : gm->rescan_param);
366 :
367 : /*
368 : * If chgParam of subnode is not null then plan will be re-scanned by
369 : * first ExecProcNode. Note: because this does nothing if we have a
370 : * rescan_param, it's currently guaranteed that parallel-aware child nodes
371 : * will not see a ReScan call until after they get a ReInitializeDSM call.
372 : * That ordering might not be something to rely on, though. A good rule
373 : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
374 : * should reset only local state, and anything that depends on both of
375 : * those steps being finished must wait until the first ExecProcNode call.
376 : */
377 48 : if (outerPlan->chgParam == NULL)
378 0 : ExecReScan(outerPlan);
379 48 : }
380 :
381 : /*
382 : * Set up the data structures that we'll need for Gather Merge.
383 : *
384 : * We allocate these once on the basis of gm->num_workers, which is an
385 : * upper bound for the number of workers we'll actually have. During
386 : * a rescan, we reset the structures to empty. This approach simplifies
387 : * not leaking memory across rescans.
388 : *
389 : * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
390 : * are for workers. The values placed into gm_heap correspond to indexes
391 : * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
392 : * 0 to n-1; it has no entry for the leader.
393 : */
394 : static void
395 354 : gather_merge_setup(GatherMergeState *gm_state)
396 : {
397 354 : GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan);
398 354 : int nreaders = gm->num_workers;
399 : int i;
400 :
401 : /*
402 : * Allocate gm_slots for the number of workers + one more slot for leader.
403 : * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
404 : * read the tuple, and then stores it directly into its gm_slots entry.
405 : * For other slots, code below will call ExecInitExtraTupleSlot() to
406 : * create a slot for the worker's results. Note that during any single
407 : * scan, we might have fewer than num_workers available workers, in which
408 : * case the extra array entries go unused.
409 : */
410 354 : gm_state->gm_slots = (TupleTableSlot **)
411 354 : palloc0((nreaders + 1) * sizeof(TupleTableSlot *));
412 :
413 : /* Allocate the tuple slot and tuple array for each worker */
414 354 : gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *)
415 354 : palloc0(nreaders * sizeof(GMReaderTupleBuffer));
416 :
417 1290 : for (i = 0; i < nreaders; i++)
418 : {
419 : /* Allocate the tuple array with length MAX_TUPLE_STORE */
420 936 : gm_state->gm_tuple_buffers[i].tuple = palloc0_array(MinimalTuple, MAX_TUPLE_STORE);
421 :
422 : /* Initialize tuple slot for worker */
423 936 : gm_state->gm_slots[i + 1] =
424 936 : ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
425 : &TTSOpsMinimalTuple);
426 : }
427 :
428 : /* Allocate the resources for the merge */
429 354 : gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
430 : heap_compare_slots,
431 : gm_state);
432 354 : }
433 :
434 : /*
435 : * Initialize the Gather Merge.
436 : *
437 : * Reset data structures to ensure they're empty. Then pull at least one
438 : * tuple from leader + each worker (or set its "done" indicator), and set up
439 : * the heap.
440 : */
441 : static void
442 168 : gather_merge_init(GatherMergeState *gm_state)
443 : {
444 168 : int nreaders = gm_state->nreaders;
445 168 : bool nowait = true;
446 : int i;
447 :
448 : /* Assert that gather_merge_setup made enough space */
449 : Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers);
450 :
451 : /* Reset leader's tuple slot to empty */
452 168 : gm_state->gm_slots[0] = NULL;
453 :
454 : /* Reset the tuple slot and tuple array for each worker */
455 592 : for (i = 0; i < nreaders; i++)
456 : {
457 : /* Reset tuple array to empty */
458 424 : gm_state->gm_tuple_buffers[i].nTuples = 0;
459 424 : gm_state->gm_tuple_buffers[i].readCounter = 0;
460 : /* Reset done flag to not-done */
461 424 : gm_state->gm_tuple_buffers[i].done = false;
462 : /* Ensure output slot is empty */
463 424 : ExecClearTuple(gm_state->gm_slots[i + 1]);
464 : }
465 :
466 : /* Reset binary heap to empty */
467 168 : binaryheap_reset(gm_state->gm_heap);
468 :
469 : /*
470 : * First, try to read a tuple from each worker (including leader) in
471 : * nowait mode. After this, if not all workers were able to produce a
472 : * tuple (or a "done" indication), then re-read from remaining workers,
473 : * this time using wait mode. Add all live readers (those producing at
474 : * least one tuple) to the heap.
475 : */
476 316 : reread:
477 1464 : for (i = 0; i <= nreaders; i++)
478 : {
479 1148 : CHECK_FOR_INTERRUPTS();
480 :
481 : /* skip this source if already known done */
482 1980 : if ((i == 0) ? gm_state->need_to_scan_locally :
483 832 : !gm_state->gm_tuple_buffers[i - 1].done)
484 : {
485 1112 : if (TupIsNull(gm_state->gm_slots[i]))
486 : {
487 : /* Don't have a tuple yet, try to get one */
488 960 : if (gather_merge_readnext(gm_state, i, nowait))
489 232 : binaryheap_add_unordered(gm_state->gm_heap,
490 : Int32GetDatum(i));
491 : }
492 : else
493 : {
494 : /*
495 : * We already got at least one tuple from this worker, but
496 : * might as well see if it has any more ready by now.
497 : */
498 152 : load_tuple_array(gm_state, i);
499 : }
500 : }
501 : }
502 :
503 : /* need not recheck leader, since nowait doesn't matter for it */
504 760 : for (i = 1; i <= nreaders; i++)
505 : {
506 592 : if (!gm_state->gm_tuple_buffers[i - 1].done &&
507 206 : TupIsNull(gm_state->gm_slots[i]))
508 : {
509 148 : nowait = false;
510 148 : goto reread;
511 : }
512 : }
513 :
514 : /* Now heapify the heap. */
515 168 : binaryheap_build(gm_state->gm_heap);
516 :
517 168 : gm_state->gm_initialized = true;
518 168 : }
519 :
520 : /*
521 : * Clear out the tuple table slot, and any unused pending tuples,
522 : * for each gather merge input.
523 : */
524 : static void
525 180 : gather_merge_clear_tuples(GatherMergeState *gm_state)
526 : {
527 : int i;
528 :
529 664 : for (i = 0; i < gm_state->nreaders; i++)
530 : {
531 484 : GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
532 :
533 484 : while (tuple_buffer->readCounter < tuple_buffer->nTuples)
534 0 : pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
535 :
536 484 : ExecClearTuple(gm_state->gm_slots[i + 1]);
537 : }
538 180 : }
539 :
540 : /*
541 : * Read the next tuple for gather merge.
542 : *
543 : * Fetch the sorted tuple out of the heap.
544 : */
545 : static TupleTableSlot *
546 254828 : gather_merge_getnext(GatherMergeState *gm_state)
547 : {
548 : int i;
549 :
550 254828 : if (!gm_state->gm_initialized)
551 : {
552 : /*
553 : * First time through: pull the first tuple from each participant, and
554 : * set up the heap.
555 : */
556 168 : gather_merge_init(gm_state);
557 : }
558 : else
559 : {
560 : /*
561 : * Otherwise, pull the next tuple from whichever participant we
562 : * returned from last time, and reinsert that participant's index into
563 : * the heap, because it might now compare differently against the
564 : * other elements of the heap.
565 : */
566 254660 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
567 :
568 254660 : if (gather_merge_readnext(gm_state, i, false))
569 254464 : binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
570 : else
571 : {
572 : /* reader exhausted, remove it from heap */
573 196 : (void) binaryheap_remove_first(gm_state->gm_heap);
574 : }
575 : }
576 :
577 254828 : if (binaryheap_empty(gm_state->gm_heap))
578 : {
579 : /* All the queues are exhausted, and so is the heap */
580 132 : gather_merge_clear_tuples(gm_state);
581 132 : return NULL;
582 : }
583 : else
584 : {
585 : /* Return next tuple from whichever participant has the leading one */
586 254696 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
587 254696 : return gm_state->gm_slots[i];
588 : }
589 : }
590 :
591 : /*
592 : * Read tuple(s) for given reader in nowait mode, and load into its tuple
593 : * array, until we have MAX_TUPLE_STORE of them or would have to block.
594 : */
595 : static void
596 638 : load_tuple_array(GatherMergeState *gm_state, int reader)
597 : {
598 : GMReaderTupleBuffer *tuple_buffer;
599 : int i;
600 :
601 : /* Don't do anything if this is the leader. */
602 638 : if (reader == 0)
603 142 : return;
604 :
605 496 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
606 :
607 : /* If there's nothing in the array, reset the counters to zero. */
608 496 : if (tuple_buffer->nTuples == tuple_buffer->readCounter)
609 486 : tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
610 :
611 : /* Try to fill additional slots in the array. */
612 4976 : for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
613 : {
614 : MinimalTuple tuple;
615 :
616 4550 : tuple = gm_readnext_tuple(gm_state,
617 : reader,
618 : true,
619 : &tuple_buffer->done);
620 4550 : if (!tuple)
621 70 : break;
622 4480 : tuple_buffer->tuple[i] = tuple;
623 4480 : tuple_buffer->nTuples++;
624 : }
625 : }
626 :
627 : /*
628 : * Store the next tuple for a given reader into the appropriate slot.
629 : *
630 : * Returns true if successful, false if not (either reader is exhausted,
631 : * or we didn't want to wait for a tuple). Sets done flag if reader
632 : * is found to be exhausted.
633 : */
634 : static bool
635 255620 : gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
636 : {
637 : GMReaderTupleBuffer *tuple_buffer;
638 : MinimalTuple tup;
639 :
640 : /*
641 : * If we're being asked to generate a tuple from the leader, then we just
642 : * call ExecProcNode as normal to produce one.
643 : */
644 255620 : if (reader == 0)
645 : {
646 249856 : if (gm_state->need_to_scan_locally)
647 : {
648 249856 : PlanState *outerPlan = outerPlanState(gm_state);
649 : TupleTableSlot *outerTupleSlot;
650 249856 : EState *estate = gm_state->ps.state;
651 :
652 : /* Install our DSA area while executing the plan. */
653 249856 : estate->es_query_dsa = gm_state->pei ? gm_state->pei->area : NULL;
654 249856 : outerTupleSlot = ExecProcNode(outerPlan);
655 249856 : estate->es_query_dsa = NULL;
656 :
657 249856 : if (!TupIsNull(outerTupleSlot))
658 : {
659 249730 : gm_state->gm_slots[0] = outerTupleSlot;
660 249730 : return true;
661 : }
662 : /* need_to_scan_locally serves as "done" flag for leader */
663 126 : gm_state->need_to_scan_locally = false;
664 : }
665 126 : return false;
666 : }
667 :
668 : /* Otherwise, check the state of the relevant tuple buffer. */
669 5764 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
670 :
671 5764 : if (tuple_buffer->nTuples > tuple_buffer->readCounter)
672 : {
673 : /* Return any tuple previously read that is still buffered. */
674 4480 : tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
675 : }
676 1284 : else if (tuple_buffer->done)
677 : {
678 : /* Reader is known to be exhausted. */
679 68 : return false;
680 : }
681 : else
682 : {
683 : /* Read and buffer next tuple. */
684 1216 : tup = gm_readnext_tuple(gm_state,
685 : reader,
686 : nowait,
687 : &tuple_buffer->done);
688 1216 : if (!tup)
689 730 : return false;
690 :
691 : /*
692 : * Attempt to read more tuples in nowait mode and store them in the
693 : * pending-tuple array for the reader.
694 : */
695 486 : load_tuple_array(gm_state, reader);
696 : }
697 :
698 : Assert(tup);
699 :
700 : /* Build the TupleTableSlot for the given tuple */
701 4966 : ExecStoreMinimalTuple(tup, /* tuple to store */
702 4966 : gm_state->gm_slots[reader], /* slot in which to
703 : * store the tuple */
704 : true); /* pfree tuple when done with it */
705 :
706 4966 : return true;
707 : }
708 :
709 : /*
710 : * Attempt to read a tuple from given worker.
711 : */
712 : static MinimalTuple
713 5766 : gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
714 : bool *done)
715 : {
716 : TupleQueueReader *reader;
717 : MinimalTuple tup;
718 :
719 : /* Check for async events, particularly messages from workers. */
720 5766 : CHECK_FOR_INTERRUPTS();
721 :
722 : /*
723 : * Attempt to read a tuple.
724 : *
725 : * Note that TupleQueueReaderNext will just return NULL for a worker which
726 : * fails to initialize. We'll treat that worker as having produced no
727 : * tuples; WaitForParallelWorkersToFinish will error out when we get
728 : * there.
729 : */
730 5766 : reader = gm_state->reader[nreader - 1];
731 5766 : tup = TupleQueueReaderNext(reader, nowait, done);
732 :
733 : /*
734 : * Since we'll be buffering these across multiple calls, we need to make a
735 : * copy.
736 : */
737 5766 : return tup ? heap_copy_minimal_tuple(tup, 0) : NULL;
738 : }
739 :
740 : /*
741 : * We have one slot for each item in the heap array. We use SlotNumber
742 : * to store slot indexes. This doesn't actually provide any formal
743 : * type-safety, but it makes the code more self-documenting.
744 : */
745 : typedef int32 SlotNumber;
746 :
747 : /*
748 : * Compare the tuples in the two given slots.
749 : */
750 : static int32
751 42440 : heap_compare_slots(Datum a, Datum b, void *arg)
752 : {
753 42440 : GatherMergeState *node = (GatherMergeState *) arg;
754 42440 : SlotNumber slot1 = DatumGetInt32(a);
755 42440 : SlotNumber slot2 = DatumGetInt32(b);
756 :
757 42440 : TupleTableSlot *s1 = node->gm_slots[slot1];
758 42440 : TupleTableSlot *s2 = node->gm_slots[slot2];
759 : int nkey;
760 :
761 : Assert(!TupIsNull(s1));
762 : Assert(!TupIsNull(s2));
763 :
764 43774 : for (nkey = 0; nkey < node->gm_nkeys; nkey++)
765 : {
766 42440 : SortSupport sortKey = node->gm_sortkeys + nkey;
767 42440 : AttrNumber attno = sortKey->ssup_attno;
768 : Datum datum1,
769 : datum2;
770 : bool isNull1,
771 : isNull2;
772 : int compare;
773 :
774 42440 : datum1 = slot_getattr(s1, attno, &isNull1);
775 42440 : datum2 = slot_getattr(s2, attno, &isNull2);
776 :
777 42440 : compare = ApplySortComparator(datum1, isNull1,
778 : datum2, isNull2,
779 : sortKey);
780 42440 : if (compare != 0)
781 : {
782 41106 : INVERT_COMPARE_RESULT(compare);
783 41106 : return compare;
784 : }
785 : }
786 1334 : return 0;
787 : }
|