Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGatherMerge.c
4 : * Scan a plan in multiple workers, and do order-preserving merge.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/nodeGatherMerge.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/relscan.h"
18 : #include "access/xact.h"
19 : #include "executor/execdebug.h"
20 : #include "executor/execParallel.h"
21 : #include "executor/nodeGatherMerge.h"
22 : #include "executor/nodeSubplan.h"
23 : #include "executor/tqueue.h"
24 : #include "lib/binaryheap.h"
25 : #include "miscadmin.h"
26 : #include "optimizer/optimizer.h"
27 : #include "utils/memutils.h"
28 : #include "utils/rel.h"
29 :
30 : /*
31 : * When we read tuples from workers, it's a good idea to read several at once
32 : * for efficiency when possible: this minimizes context-switching overhead.
33 : * But reading too many at a time wastes memory without improving performance.
34 : * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one).
35 : */
36 : #define MAX_TUPLE_STORE 10
37 :
38 : /*
39 : * Pending-tuple array for each worker. This holds additional tuples that
40 : * we were able to fetch from the worker, but can't process yet. In addition,
41 : * this struct holds the "done" flag indicating the worker is known to have
42 : * no more tuples. (We do not use this struct for the leader; we don't keep
43 : * any pending tuples for the leader, and the need_to_scan_locally flag serves
44 : * as its "done" indicator.)
45 : */
46 : typedef struct GMReaderTupleBuffer
47 : {
48 : MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
49 : int nTuples; /* number of tuples currently stored */
50 : int readCounter; /* index of next tuple to extract */
51 : bool done; /* true if reader is known exhausted */
52 : } GMReaderTupleBuffer;
53 :
54 : static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
55 : static int32 heap_compare_slots(Datum a, Datum b, void *arg);
56 : static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
57 : static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
58 : bool nowait, bool *done);
59 : static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
60 : static void gather_merge_setup(GatherMergeState *gm_state);
61 : static void gather_merge_init(GatherMergeState *gm_state);
62 : static void gather_merge_clear_tuples(GatherMergeState *gm_state);
63 : static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
64 : bool nowait);
65 : static void load_tuple_array(GatherMergeState *gm_state, int reader);
66 :
67 : /* ----------------------------------------------------------------
68 : * ExecInitGather
69 : * ----------------------------------------------------------------
70 : */
71 : GatherMergeState *
72 276 : ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
73 : {
74 : GatherMergeState *gm_state;
75 : Plan *outerNode;
76 : TupleDesc tupDesc;
77 :
78 : /* Gather merge node doesn't have innerPlan node. */
79 : Assert(innerPlan(node) == NULL);
80 :
81 : /*
82 : * create state structure
83 : */
84 276 : gm_state = makeNode(GatherMergeState);
85 276 : gm_state->ps.plan = (Plan *) node;
86 276 : gm_state->ps.state = estate;
87 276 : gm_state->ps.ExecProcNode = ExecGatherMerge;
88 :
89 276 : gm_state->initialized = false;
90 276 : gm_state->gm_initialized = false;
91 276 : gm_state->tuples_needed = -1;
92 :
93 : /*
94 : * Miscellaneous initialization
95 : *
96 : * create expression context for node
97 : */
98 276 : ExecAssignExprContext(estate, &gm_state->ps);
99 :
100 : /*
101 : * GatherMerge doesn't support checking a qual (it's always more efficient
102 : * to do it in the child node).
103 : */
104 : Assert(!node->plan.qual);
105 :
106 : /*
107 : * now initialize outer plan
108 : */
109 276 : outerNode = outerPlan(node);
110 276 : outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags);
111 :
112 : /*
113 : * Leader may access ExecProcNode result directly (if
114 : * need_to_scan_locally), or from workers via tuple queue. So we can't
115 : * trivially rely on the slot type being fixed for expressions evaluated
116 : * within this node.
117 : */
118 276 : gm_state->ps.outeropsset = true;
119 276 : gm_state->ps.outeropsfixed = false;
120 :
121 : /*
122 : * Store the tuple descriptor into gather merge state, so we can use it
123 : * while initializing the gather merge slots.
124 : */
125 276 : tupDesc = ExecGetResultType(outerPlanState(gm_state));
126 276 : gm_state->tupDesc = tupDesc;
127 :
128 : /*
129 : * Initialize result type and projection.
130 : */
131 276 : ExecInitResultTypeTL(&gm_state->ps);
132 276 : ExecConditionalAssignProjectionInfo(&gm_state->ps, tupDesc, OUTER_VAR);
133 :
134 : /*
135 : * Without projections result slot type is not trivially known, see
136 : * comment above.
137 : */
138 276 : if (gm_state->ps.ps_ProjInfo == NULL)
139 : {
140 264 : gm_state->ps.resultopsset = true;
141 264 : gm_state->ps.resultopsfixed = false;
142 : }
143 :
144 : /*
145 : * initialize sort-key information
146 : */
147 276 : if (node->numCols)
148 : {
149 : int i;
150 :
151 276 : gm_state->gm_nkeys = node->numCols;
152 276 : gm_state->gm_sortkeys =
153 276 : palloc0(sizeof(SortSupportData) * node->numCols);
154 :
155 612 : for (i = 0; i < node->numCols; i++)
156 : {
157 336 : SortSupport sortKey = gm_state->gm_sortkeys + i;
158 :
159 336 : sortKey->ssup_cxt = CurrentMemoryContext;
160 336 : sortKey->ssup_collation = node->collations[i];
161 336 : sortKey->ssup_nulls_first = node->nullsFirst[i];
162 336 : sortKey->ssup_attno = node->sortColIdx[i];
163 :
164 : /*
165 : * We don't perform abbreviated key conversion here, for the same
166 : * reasons that it isn't used in MergeAppend
167 : */
168 336 : sortKey->abbreviate = false;
169 :
170 336 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
171 : }
172 : }
173 :
174 : /* Now allocate the workspace for gather merge */
175 276 : gather_merge_setup(gm_state);
176 :
177 276 : return gm_state;
178 : }
179 :
180 : /* ----------------------------------------------------------------
181 : * ExecGatherMerge(node)
182 : *
183 : * Scans the relation via multiple workers and returns
184 : * the next qualifying tuple.
185 : * ----------------------------------------------------------------
186 : */
187 : static TupleTableSlot *
188 254092 : ExecGatherMerge(PlanState *pstate)
189 : {
190 254092 : GatherMergeState *node = castNode(GatherMergeState, pstate);
191 : TupleTableSlot *slot;
192 : ExprContext *econtext;
193 :
194 254092 : CHECK_FOR_INTERRUPTS();
195 :
196 : /*
197 : * As with Gather, we don't launch workers until this node is actually
198 : * executed.
199 : */
200 254092 : if (!node->initialized)
201 : {
202 150 : EState *estate = node->ps.state;
203 150 : GatherMerge *gm = castNode(GatherMerge, node->ps.plan);
204 :
205 : /*
206 : * Sometimes we might have to run without parallelism; but if parallel
207 : * mode is active then we can try to fire up some workers.
208 : */
209 150 : if (gm->num_workers > 0 && estate->es_use_parallel_mode)
210 : {
211 : ParallelContext *pcxt;
212 :
213 : /* Initialize, or re-initialize, shared state needed by workers. */
214 150 : if (!node->pei)
215 120 : node->pei = ExecInitParallelPlan(outerPlanState(node),
216 : estate,
217 : gm->initParam,
218 : gm->num_workers,
219 : node->tuples_needed);
220 : else
221 30 : ExecParallelReinitialize(outerPlanState(node),
222 30 : node->pei,
223 : gm->initParam);
224 :
225 : /* Try to launch workers. */
226 150 : pcxt = node->pei->pcxt;
227 150 : LaunchParallelWorkers(pcxt);
228 : /* We save # workers launched for the benefit of EXPLAIN */
229 150 : node->nworkers_launched = pcxt->nworkers_launched;
230 :
231 : /* Set up tuple queue readers to read the results. */
232 150 : if (pcxt->nworkers_launched > 0)
233 : {
234 138 : ExecParallelCreateReaders(node->pei);
235 : /* Make a working array showing the active readers */
236 138 : node->nreaders = pcxt->nworkers_launched;
237 138 : node->reader = (TupleQueueReader **)
238 138 : palloc(node->nreaders * sizeof(TupleQueueReader *));
239 138 : memcpy(node->reader, node->pei->reader,
240 138 : node->nreaders * sizeof(TupleQueueReader *));
241 : }
242 : else
243 : {
244 : /* No workers? Then never mind. */
245 12 : node->nreaders = 0;
246 12 : node->reader = NULL;
247 : }
248 : }
249 :
250 : /* allow leader to participate if enabled or no choice */
251 150 : if (parallel_leader_participation || node->nreaders == 0)
252 144 : node->need_to_scan_locally = true;
253 150 : node->initialized = true;
254 : }
255 :
256 : /*
257 : * Reset per-tuple memory context to free any expression evaluation
258 : * storage allocated in the previous tuple cycle.
259 : */
260 254092 : econtext = node->ps.ps_ExprContext;
261 254092 : ResetExprContext(econtext);
262 :
263 : /*
264 : * Get next tuple, either from one of our workers, or by running the plan
265 : * ourselves.
266 : */
267 254092 : slot = gather_merge_getnext(node);
268 254092 : if (TupIsNull(slot))
269 120 : return NULL;
270 :
271 : /* If no projection is required, we're done. */
272 253972 : if (node->ps.ps_ProjInfo == NULL)
273 253972 : return slot;
274 :
275 : /*
276 : * Form the result tuple using ExecProject(), and return it.
277 : */
278 0 : econtext->ecxt_outertuple = slot;
279 0 : return ExecProject(node->ps.ps_ProjInfo);
280 : }
281 :
282 : /* ----------------------------------------------------------------
283 : * ExecEndGatherMerge
284 : *
285 : * frees any storage allocated through C routines.
286 : * ----------------------------------------------------------------
287 : */
288 : void
289 276 : ExecEndGatherMerge(GatherMergeState *node)
290 : {
291 276 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
292 276 : ExecShutdownGatherMerge(node);
293 276 : }
294 :
295 : /* ----------------------------------------------------------------
296 : * ExecShutdownGatherMerge
297 : *
298 : * Destroy the setup for parallel workers including parallel context.
299 : * ----------------------------------------------------------------
300 : */
301 : void
302 396 : ExecShutdownGatherMerge(GatherMergeState *node)
303 : {
304 396 : ExecShutdownGatherMergeWorkers(node);
305 :
306 : /* Now destroy the parallel context. */
307 396 : if (node->pei != NULL)
308 : {
309 120 : ExecParallelCleanup(node->pei);
310 120 : node->pei = NULL;
311 : }
312 396 : }
313 :
314 : /* ----------------------------------------------------------------
315 : * ExecShutdownGatherMergeWorkers
316 : *
317 : * Stop all the parallel workers.
318 : * ----------------------------------------------------------------
319 : */
320 : static void
321 444 : ExecShutdownGatherMergeWorkers(GatherMergeState *node)
322 : {
323 444 : if (node->pei != NULL)
324 150 : ExecParallelFinish(node->pei);
325 :
326 : /* Flush local copy of reader array */
327 444 : if (node->reader)
328 138 : pfree(node->reader);
329 444 : node->reader = NULL;
330 444 : }
331 :
332 : /* ----------------------------------------------------------------
333 : * ExecReScanGatherMerge
334 : *
335 : * Prepare to re-scan the result of a GatherMerge.
336 : * ----------------------------------------------------------------
337 : */
338 : void
339 48 : ExecReScanGatherMerge(GatherMergeState *node)
340 : {
341 48 : GatherMerge *gm = (GatherMerge *) node->ps.plan;
342 48 : PlanState *outerPlan = outerPlanState(node);
343 :
344 : /* Make sure any existing workers are gracefully shut down */
345 48 : ExecShutdownGatherMergeWorkers(node);
346 :
347 : /* Free any unused tuples, so we don't leak memory across rescans */
348 48 : gather_merge_clear_tuples(node);
349 :
350 : /* Mark node so that shared state will be rebuilt at next call */
351 48 : node->initialized = false;
352 48 : node->gm_initialized = false;
353 :
354 : /*
355 : * Set child node's chgParam to tell it that the next scan might deliver a
356 : * different set of rows within the leader process. (The overall rowset
357 : * shouldn't change, but the leader process's subset might; hence nodes
358 : * between here and the parallel table scan node mustn't optimize on the
359 : * assumption of an unchanging rowset.)
360 : */
361 48 : if (gm->rescan_param >= 0)
362 48 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
363 : gm->rescan_param);
364 :
365 : /*
366 : * If chgParam of subnode is not null then plan will be re-scanned by
367 : * first ExecProcNode. Note: because this does nothing if we have a
368 : * rescan_param, it's currently guaranteed that parallel-aware child nodes
369 : * will not see a ReScan call until after they get a ReInitializeDSM call.
370 : * That ordering might not be something to rely on, though. A good rule
371 : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
372 : * should reset only local state, and anything that depends on both of
373 : * those steps being finished must wait until the first ExecProcNode call.
374 : */
375 48 : if (outerPlan->chgParam == NULL)
376 0 : ExecReScan(outerPlan);
377 48 : }
378 :
379 : /*
380 : * Set up the data structures that we'll need for Gather Merge.
381 : *
382 : * We allocate these once on the basis of gm->num_workers, which is an
383 : * upper bound for the number of workers we'll actually have. During
384 : * a rescan, we reset the structures to empty. This approach simplifies
385 : * not leaking memory across rescans.
386 : *
387 : * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
388 : * are for workers. The values placed into gm_heap correspond to indexes
389 : * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
390 : * 0 to n-1; it has no entry for the leader.
391 : */
392 : static void
393 276 : gather_merge_setup(GatherMergeState *gm_state)
394 : {
395 276 : GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan);
396 276 : int nreaders = gm->num_workers;
397 : int i;
398 :
399 : /*
400 : * Allocate gm_slots for the number of workers + one more slot for leader.
401 : * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
402 : * read the tuple, and then stores it directly into its gm_slots entry.
403 : * For other slots, code below will call ExecInitExtraTupleSlot() to
404 : * create a slot for the worker's results. Note that during any single
405 : * scan, we might have fewer than num_workers available workers, in which
406 : * case the extra array entries go unused.
407 : */
408 276 : gm_state->gm_slots = (TupleTableSlot **)
409 276 : palloc0((nreaders + 1) * sizeof(TupleTableSlot *));
410 :
411 : /* Allocate the tuple slot and tuple array for each worker */
412 276 : gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *)
413 276 : palloc0(nreaders * sizeof(GMReaderTupleBuffer));
414 :
415 1032 : for (i = 0; i < nreaders; i++)
416 : {
417 : /* Allocate the tuple array with length MAX_TUPLE_STORE */
418 1512 : gm_state->gm_tuple_buffers[i].tuple =
419 756 : (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
420 :
421 : /* Initialize tuple slot for worker */
422 756 : gm_state->gm_slots[i + 1] =
423 756 : ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
424 : &TTSOpsMinimalTuple);
425 : }
426 :
427 : /* Allocate the resources for the merge */
428 276 : gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
429 : heap_compare_slots,
430 : gm_state);
431 276 : }
432 :
433 : /*
434 : * Initialize the Gather Merge.
435 : *
436 : * Reset data structures to ensure they're empty. Then pull at least one
437 : * tuple from leader + each worker (or set its "done" indicator), and set up
438 : * the heap.
439 : */
440 : static void
441 150 : gather_merge_init(GatherMergeState *gm_state)
442 : {
443 150 : int nreaders = gm_state->nreaders;
444 150 : bool nowait = true;
445 : int i;
446 :
447 : /* Assert that gather_merge_setup made enough space */
448 : Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers);
449 :
450 : /* Reset leader's tuple slot to empty */
451 150 : gm_state->gm_slots[0] = NULL;
452 :
453 : /* Reset the tuple slot and tuple array for each worker */
454 546 : for (i = 0; i < nreaders; i++)
455 : {
456 : /* Reset tuple array to empty */
457 396 : gm_state->gm_tuple_buffers[i].nTuples = 0;
458 396 : gm_state->gm_tuple_buffers[i].readCounter = 0;
459 : /* Reset done flag to not-done */
460 396 : gm_state->gm_tuple_buffers[i].done = false;
461 : /* Ensure output slot is empty */
462 396 : ExecClearTuple(gm_state->gm_slots[i + 1]);
463 : }
464 :
465 : /* Reset binary heap to empty */
466 150 : binaryheap_reset(gm_state->gm_heap);
467 :
468 : /*
469 : * First, try to read a tuple from each worker (including leader) in
470 : * nowait mode. After this, if not all workers were able to produce a
471 : * tuple (or a "done" indication), then re-read from remaining workers,
472 : * this time using wait mode. Add all live readers (those producing at
473 : * least one tuple) to the heap.
474 : */
475 288 : reread:
476 1368 : for (i = 0; i <= nreaders; i++)
477 : {
478 1080 : CHECK_FOR_INTERRUPTS();
479 :
480 : /* skip this source if already known done */
481 1872 : if ((i == 0) ? gm_state->need_to_scan_locally :
482 792 : !gm_state->gm_tuple_buffers[i - 1].done)
483 : {
484 1064 : if (TupIsNull(gm_state->gm_slots[i]))
485 : {
486 : /* Don't have a tuple yet, try to get one */
487 930 : if (gather_merge_readnext(gm_state, i, nowait))
488 162 : binaryheap_add_unordered(gm_state->gm_heap,
489 : Int32GetDatum(i));
490 : }
491 : else
492 : {
493 : /*
494 : * We already got at least one tuple from this worker, but
495 : * might as well see if it has any more ready by now.
496 : */
497 134 : load_tuple_array(gm_state, i);
498 : }
499 : }
500 : }
501 :
502 : /* need not recheck leader, since nowait doesn't matter for it */
503 684 : for (i = 1; i <= nreaders; i++)
504 : {
505 534 : if (!gm_state->gm_tuple_buffers[i - 1].done &&
506 148 : TupIsNull(gm_state->gm_slots[i]))
507 : {
508 138 : nowait = false;
509 138 : goto reread;
510 : }
511 : }
512 :
513 : /* Now heapify the heap. */
514 150 : binaryheap_build(gm_state->gm_heap);
515 :
516 150 : gm_state->gm_initialized = true;
517 150 : }
518 :
519 : /*
520 : * Clear out the tuple table slot, and any unused pending tuples,
521 : * for each gather merge input.
522 : */
523 : static void
524 168 : gather_merge_clear_tuples(GatherMergeState *gm_state)
525 : {
526 : int i;
527 :
528 630 : for (i = 0; i < gm_state->nreaders; i++)
529 : {
530 462 : GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
531 :
532 462 : while (tuple_buffer->readCounter < tuple_buffer->nTuples)
533 0 : pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
534 :
535 462 : ExecClearTuple(gm_state->gm_slots[i + 1]);
536 : }
537 168 : }
538 :
539 : /*
540 : * Read the next tuple for gather merge.
541 : *
542 : * Fetch the sorted tuple out of the heap.
543 : */
544 : static TupleTableSlot *
545 254092 : gather_merge_getnext(GatherMergeState *gm_state)
546 : {
547 : int i;
548 :
549 254092 : if (!gm_state->gm_initialized)
550 : {
551 : /*
552 : * First time through: pull the first tuple from each participant, and
553 : * set up the heap.
554 : */
555 150 : gather_merge_init(gm_state);
556 : }
557 : else
558 : {
559 : /*
560 : * Otherwise, pull the next tuple from whichever participant we
561 : * returned from last time, and reinsert that participant's index into
562 : * the heap, because it might now compare differently against the
563 : * other elements of the heap.
564 : */
565 253942 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
566 :
567 253942 : if (gather_merge_readnext(gm_state, i, false))
568 253810 : binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
569 : else
570 : {
571 : /* reader exhausted, remove it from heap */
572 132 : (void) binaryheap_remove_first(gm_state->gm_heap);
573 : }
574 : }
575 :
576 254092 : if (binaryheap_empty(gm_state->gm_heap))
577 : {
578 : /* All the queues are exhausted, and so is the heap */
579 120 : gather_merge_clear_tuples(gm_state);
580 120 : return NULL;
581 : }
582 : else
583 : {
584 : /* Return next tuple from whichever participant has the leading one */
585 253972 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
586 253972 : return gm_state->gm_slots[i];
587 : }
588 : }
589 :
590 : /*
591 : * Read tuple(s) for given reader in nowait mode, and load into its tuple
592 : * array, until we have MAX_TUPLE_STORE of them or would have to block.
593 : */
594 : static void
595 162 : load_tuple_array(GatherMergeState *gm_state, int reader)
596 : {
597 : GMReaderTupleBuffer *tuple_buffer;
598 : int i;
599 :
600 : /* Don't do anything if this is the leader. */
601 162 : if (reader == 0)
602 132 : return;
603 :
604 30 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
605 :
606 : /* If there's nothing in the array, reset the counters to zero. */
607 30 : if (tuple_buffer->nTuples == tuple_buffer->readCounter)
608 28 : tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
609 :
610 : /* Try to fill additional slots in the array. */
611 246 : for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
612 : {
613 : MinimalTuple tuple;
614 :
615 234 : tuple = gm_readnext_tuple(gm_state,
616 : reader,
617 : true,
618 : &tuple_buffer->done);
619 234 : if (!tuple)
620 18 : break;
621 216 : tuple_buffer->tuple[i] = tuple;
622 216 : tuple_buffer->nTuples++;
623 : }
624 : }
625 :
626 : /*
627 : * Store the next tuple for a given reader into the appropriate slot.
628 : *
629 : * Returns true if successful, false if not (either reader is exhausted,
630 : * or we didn't want to wait for a tuple). Sets done flag if reader
631 : * is found to be exhausted.
632 : */
633 : static bool
634 254872 : gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
635 : {
636 : GMReaderTupleBuffer *tuple_buffer;
637 : MinimalTuple tup;
638 :
639 : /*
640 : * If we're being asked to generate a tuple from the leader, then we just
641 : * call ExecProcNode as normal to produce one.
642 : */
643 254872 : if (reader == 0)
644 : {
645 253842 : if (gm_state->need_to_scan_locally)
646 : {
647 253842 : PlanState *outerPlan = outerPlanState(gm_state);
648 : TupleTableSlot *outerTupleSlot;
649 253842 : EState *estate = gm_state->ps.state;
650 :
651 : /* Install our DSA area while executing the plan. */
652 253842 : estate->es_query_dsa = gm_state->pei ? gm_state->pei->area : NULL;
653 253842 : outerTupleSlot = ExecProcNode(outerPlan);
654 253842 : estate->es_query_dsa = NULL;
655 :
656 253842 : if (!TupIsNull(outerTupleSlot))
657 : {
658 253728 : gm_state->gm_slots[0] = outerTupleSlot;
659 253728 : return true;
660 : }
661 : /* need_to_scan_locally serves as "done" flag for leader */
662 114 : gm_state->need_to_scan_locally = false;
663 : }
664 114 : return false;
665 : }
666 :
667 : /* Otherwise, check the state of the relevant tuple buffer. */
668 1030 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
669 :
670 1030 : if (tuple_buffer->nTuples > tuple_buffer->readCounter)
671 : {
672 : /* Return any tuple previously read that is still buffered. */
673 216 : tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
674 : }
675 814 : else if (tuple_buffer->done)
676 : {
677 : /* Reader is known to be exhausted. */
678 18 : return false;
679 : }
680 : else
681 : {
682 : /* Read and buffer next tuple. */
683 796 : tup = gm_readnext_tuple(gm_state,
684 : reader,
685 : nowait,
686 : &tuple_buffer->done);
687 796 : if (!tup)
688 768 : return false;
689 :
690 : /*
691 : * Attempt to read more tuples in nowait mode and store them in the
692 : * pending-tuple array for the reader.
693 : */
694 28 : load_tuple_array(gm_state, reader);
695 : }
696 :
697 : Assert(tup);
698 :
699 : /* Build the TupleTableSlot for the given tuple */
700 244 : ExecStoreMinimalTuple(tup, /* tuple to store */
701 244 : gm_state->gm_slots[reader], /* slot in which to
702 : * store the tuple */
703 : true); /* pfree tuple when done with it */
704 :
705 244 : return true;
706 : }
707 :
708 : /*
709 : * Attempt to read a tuple from given worker.
710 : */
711 : static MinimalTuple
712 1030 : gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
713 : bool *done)
714 : {
715 : TupleQueueReader *reader;
716 : MinimalTuple tup;
717 :
718 : /* Check for async events, particularly messages from workers. */
719 1030 : CHECK_FOR_INTERRUPTS();
720 :
721 : /*
722 : * Attempt to read a tuple.
723 : *
724 : * Note that TupleQueueReaderNext will just return NULL for a worker which
725 : * fails to initialize. We'll treat that worker as having produced no
726 : * tuples; WaitForParallelWorkersToFinish will error out when we get
727 : * there.
728 : */
729 1030 : reader = gm_state->reader[nreader - 1];
730 1030 : tup = TupleQueueReaderNext(reader, nowait, done);
731 :
732 : /*
733 : * Since we'll be buffering these across multiple calls, we need to make a
734 : * copy.
735 : */
736 1030 : return tup ? heap_copy_minimal_tuple(tup) : NULL;
737 : }
738 :
739 : /*
740 : * We have one slot for each item in the heap array. We use SlotNumber
741 : * to store slot indexes. This doesn't actually provide any formal
742 : * type-safety, but it makes the code more self-documenting.
743 : */
744 : typedef int32 SlotNumber;
745 :
746 : /*
747 : * Compare the tuples in the two given slots.
748 : */
749 : static int32
750 324 : heap_compare_slots(Datum a, Datum b, void *arg)
751 : {
752 324 : GatherMergeState *node = (GatherMergeState *) arg;
753 324 : SlotNumber slot1 = DatumGetInt32(a);
754 324 : SlotNumber slot2 = DatumGetInt32(b);
755 :
756 324 : TupleTableSlot *s1 = node->gm_slots[slot1];
757 324 : TupleTableSlot *s2 = node->gm_slots[slot2];
758 : int nkey;
759 :
760 : Assert(!TupIsNull(s1));
761 : Assert(!TupIsNull(s2));
762 :
763 478 : for (nkey = 0; nkey < node->gm_nkeys; nkey++)
764 : {
765 324 : SortSupport sortKey = node->gm_sortkeys + nkey;
766 324 : AttrNumber attno = sortKey->ssup_attno;
767 : Datum datum1,
768 : datum2;
769 : bool isNull1,
770 : isNull2;
771 : int compare;
772 :
773 324 : datum1 = slot_getattr(s1, attno, &isNull1);
774 324 : datum2 = slot_getattr(s2, attno, &isNull2);
775 :
776 324 : compare = ApplySortComparator(datum1, isNull1,
777 : datum2, isNull2,
778 : sortKey);
779 324 : if (compare != 0)
780 : {
781 170 : INVERT_COMPARE_RESULT(compare);
782 170 : return compare;
783 : }
784 : }
785 154 : return 0;
786 : }
|