Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGather.c
4 : * Support routines for scanning a plan via multiple workers.
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * A Gather executor launches parallel workers to run multiple copies of a
10 : * plan. It can also run the plan itself, if the workers are not available
11 : * or have not started up yet. It then merges all of the results it produces
12 : * and the results from the workers into a single output stream. Therefore,
13 : * it will normally be used with a plan where running multiple copies of the
14 : * same plan does not produce duplicate output, such as parallel-aware
15 : * SeqScan.
16 : *
17 : * Alternatively, a Gather node can be configured to use just one worker
18 : * and the single-copy flag can be set. In this case, the Gather node will
19 : * run the plan in one worker and will not execute the plan itself. In
20 : * this case, it simply returns whatever tuples were returned by the worker.
21 : * If a worker cannot be obtained, then it will run the plan itself and
22 : * return the results. Therefore, a plan used with a single-copy Gather
23 : * node need not be parallel-aware.
24 : *
25 : * IDENTIFICATION
26 : * src/backend/executor/nodeGather.c
27 : *
28 : *-------------------------------------------------------------------------
29 : */
30 :
31 : #include "postgres.h"
32 :
33 : #include "executor/execParallel.h"
34 : #include "executor/executor.h"
35 : #include "executor/nodeGather.h"
36 : #include "executor/tqueue.h"
37 : #include "miscadmin.h"
38 : #include "optimizer/optimizer.h"
39 : #include "utils/wait_event.h"
40 :
41 :
42 : static TupleTableSlot *ExecGather(PlanState *pstate);
43 : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
44 : static MinimalTuple gather_readnext(GatherState *gatherstate);
45 : static void ExecShutdownGatherWorkers(GatherState *node);
46 :
47 :
48 : /* ----------------------------------------------------------------
49 : * ExecInitGather
50 : * ----------------------------------------------------------------
51 : */
52 : GatherState *
53 956 : ExecInitGather(Gather *node, EState *estate, int eflags)
54 : {
55 : GatherState *gatherstate;
56 : Plan *outerNode;
57 : TupleDesc tupDesc;
58 :
59 : /* Gather node doesn't have innerPlan node. */
60 : Assert(innerPlan(node) == NULL);
61 :
62 : /*
63 : * create state structure
64 : */
65 956 : gatherstate = makeNode(GatherState);
66 956 : gatherstate->ps.plan = (Plan *) node;
67 956 : gatherstate->ps.state = estate;
68 956 : gatherstate->ps.ExecProcNode = ExecGather;
69 :
70 956 : gatherstate->initialized = false;
71 956 : gatherstate->need_to_scan_locally =
72 956 : !node->single_copy && parallel_leader_participation;
73 956 : gatherstate->tuples_needed = -1;
74 :
75 : /*
76 : * Miscellaneous initialization
77 : *
78 : * create expression context for node
79 : */
80 956 : ExecAssignExprContext(estate, &gatherstate->ps);
81 :
82 : /*
83 : * now initialize outer plan
84 : */
85 956 : outerNode = outerPlan(node);
86 956 : outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
87 956 : tupDesc = ExecGetResultType(outerPlanState(gatherstate));
88 :
89 : /*
90 : * Leader may access ExecProcNode result directly (if
91 : * need_to_scan_locally), or from workers via tuple queue. So we can't
92 : * trivially rely on the slot type being fixed for expressions evaluated
93 : * within this node.
94 : */
95 956 : gatherstate->ps.outeropsset = true;
96 956 : gatherstate->ps.outeropsfixed = false;
97 :
98 : /*
99 : * Initialize result type and projection.
100 : */
101 956 : ExecInitResultTypeTL(&gatherstate->ps);
102 956 : ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
103 :
104 : /*
105 : * Without projections result slot type is not trivially known, see
106 : * comment above.
107 : */
108 956 : if (gatherstate->ps.ps_ProjInfo == NULL)
109 : {
110 914 : gatherstate->ps.resultopsset = true;
111 914 : gatherstate->ps.resultopsfixed = false;
112 : }
113 :
114 : /*
115 : * Initialize funnel slot to same tuple descriptor as outer plan.
116 : */
117 956 : gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
118 : &TTSOpsMinimalTuple);
119 :
120 : /*
121 : * Gather doesn't support checking a qual (it's always more efficient to
122 : * do it in the child node).
123 : */
124 : Assert(!node->plan.qual);
125 :
126 956 : return gatherstate;
127 : }
128 :
129 : /* ----------------------------------------------------------------
130 : * ExecGather(node)
131 : *
132 : * Scans the relation via multiple workers and returns
133 : * the next qualifying tuple.
134 : * ----------------------------------------------------------------
135 : */
136 : static TupleTableSlot *
137 2925102 : ExecGather(PlanState *pstate)
138 : {
139 2925102 : GatherState *node = castNode(GatherState, pstate);
140 : TupleTableSlot *slot;
141 : ExprContext *econtext;
142 :
143 2925102 : CHECK_FOR_INTERRUPTS();
144 :
145 : /*
146 : * Initialize the parallel context and workers on first execution. We do
147 : * this on first execution rather than during node initialization, as it
148 : * needs to allocate a large dynamic segment, so it is better to do it
149 : * only if it is really needed.
150 : */
151 2925102 : if (!node->initialized)
152 : {
153 764 : EState *estate = node->ps.state;
154 764 : Gather *gather = (Gather *) node->ps.plan;
155 :
156 : /*
157 : * Sometimes we might have to run without parallelism; but if parallel
158 : * mode is active then we can try to fire up some workers.
159 : */
160 764 : if (gather->num_workers > 0 && estate->es_use_parallel_mode)
161 : {
162 : ParallelContext *pcxt;
163 :
164 : /* Initialize, or re-initialize, shared state needed by workers. */
165 758 : if (!node->pei)
166 530 : node->pei = ExecInitParallelPlan(outerPlanState(node),
167 : estate,
168 : gather->initParam,
169 : gather->num_workers,
170 : node->tuples_needed);
171 : else
172 228 : ExecParallelReinitialize(outerPlanState(node),
173 228 : node->pei,
174 : gather->initParam);
175 :
176 : /*
177 : * Register backend workers. We might not get as many as we
178 : * requested, or indeed any at all.
179 : */
180 758 : pcxt = node->pei->pcxt;
181 758 : LaunchParallelWorkers(pcxt);
182 : /* We save # workers launched for the benefit of EXPLAIN */
183 758 : node->nworkers_launched = pcxt->nworkers_launched;
184 :
185 : /* Set up tuple queue readers to read the results. */
186 758 : if (pcxt->nworkers_launched > 0)
187 : {
188 752 : ExecParallelCreateReaders(node->pei);
189 : /* Make a working array showing the active readers */
190 752 : node->nreaders = pcxt->nworkers_launched;
191 752 : node->reader = (TupleQueueReader **)
192 752 : palloc(node->nreaders * sizeof(TupleQueueReader *));
193 752 : memcpy(node->reader, node->pei->reader,
194 752 : node->nreaders * sizeof(TupleQueueReader *));
195 : }
196 : else
197 : {
198 : /* No workers? Then never mind. */
199 6 : node->nreaders = 0;
200 6 : node->reader = NULL;
201 : }
202 758 : node->nextreader = 0;
203 : }
204 :
205 : /* Run plan locally if no workers or enabled and not single-copy. */
206 1528 : node->need_to_scan_locally = (node->nreaders == 0)
207 764 : || (!gather->single_copy && parallel_leader_participation);
208 764 : node->initialized = true;
209 : }
210 :
211 : /*
212 : * Reset per-tuple memory context to free any expression evaluation
213 : * storage allocated in the previous tuple cycle.
214 : */
215 2925102 : econtext = node->ps.ps_ExprContext;
216 2925102 : ResetExprContext(econtext);
217 :
218 : /*
219 : * Get next tuple, either from one of our workers, or by running the plan
220 : * ourselves.
221 : */
222 2925102 : slot = gather_getnext(node);
223 2925096 : if (TupIsNull(slot))
224 758 : return NULL;
225 :
226 : /* If no projection is required, we're done. */
227 2924338 : if (node->ps.ps_ProjInfo == NULL)
228 2924338 : return slot;
229 :
230 : /*
231 : * Form the result tuple using ExecProject(), and return it.
232 : */
233 0 : econtext->ecxt_outertuple = slot;
234 0 : return ExecProject(node->ps.ps_ProjInfo);
235 : }
236 :
237 : /* ----------------------------------------------------------------
238 : * ExecEndGather
239 : *
240 : * frees any storage allocated through C routines.
241 : * ----------------------------------------------------------------
242 : */
243 : void
244 950 : ExecEndGather(GatherState *node)
245 : {
246 950 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
247 950 : ExecShutdownGather(node);
248 950 : }
249 :
250 : /*
251 : * Read the next tuple. We might fetch a tuple from one of the tuple queues
252 : * using gather_readnext, or if no tuple queue contains a tuple and the
253 : * single_copy flag is not set, we might generate one locally instead.
254 : */
255 : static TupleTableSlot *
256 2925102 : gather_getnext(GatherState *gatherstate)
257 : {
258 2925102 : PlanState *outerPlan = outerPlanState(gatherstate);
259 : TupleTableSlot *outerTupleSlot;
260 2925102 : TupleTableSlot *fslot = gatherstate->funnel_slot;
261 : MinimalTuple tup;
262 :
263 2926382 : while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
264 : {
265 2925624 : CHECK_FOR_INTERRUPTS();
266 :
267 2925624 : if (gatherstate->nreaders > 0)
268 : {
269 2909358 : tup = gather_readnext(gatherstate);
270 :
271 2909352 : if (HeapTupleIsValid(tup))
272 : {
273 1440998 : ExecStoreMinimalTuple(tup, /* tuple to store */
274 : fslot, /* slot to store the tuple */
275 : false); /* don't pfree tuple */
276 1440998 : return fslot;
277 : }
278 : }
279 :
280 1484620 : if (gatherstate->need_to_scan_locally)
281 : {
282 1483898 : EState *estate = gatherstate->ps.state;
283 :
284 : /* Install our DSA area while executing the plan. */
285 1483898 : estate->es_query_dsa =
286 1483898 : gatherstate->pei ? gatherstate->pei->area : NULL;
287 1483898 : outerTupleSlot = ExecProcNode(outerPlan);
288 1483898 : estate->es_query_dsa = NULL;
289 :
290 1483898 : if (!TupIsNull(outerTupleSlot))
291 1483340 : return outerTupleSlot;
292 :
293 558 : gatherstate->need_to_scan_locally = false;
294 : }
295 : }
296 :
297 758 : return ExecClearTuple(fslot);
298 : }
299 :
300 : /*
301 : * Attempt to read a tuple from one of our parallel workers.
302 : */
303 : static MinimalTuple
304 2909358 : gather_readnext(GatherState *gatherstate)
305 : {
306 2909358 : int nvisited = 0;
307 :
308 : for (;;)
309 3850696 : {
310 : TupleQueueReader *reader;
311 : MinimalTuple tup;
312 : bool readerdone;
313 :
314 : /* Check for async events, particularly messages from workers. */
315 6760054 : CHECK_FOR_INTERRUPTS();
316 :
317 : /*
318 : * Attempt to read a tuple, but don't block if none is available.
319 : *
320 : * Note that TupleQueueReaderNext will just return NULL for a worker
321 : * which fails to initialize. We'll treat that worker as having
322 : * produced no tuples; WaitForParallelWorkersToFinish will error out
323 : * when we get there.
324 : */
325 : Assert(gatherstate->nextreader < gatherstate->nreaders);
326 6760048 : reader = gatherstate->reader[gatherstate->nextreader];
327 6760048 : tup = TupleQueueReaderNext(reader, true, &readerdone);
328 :
329 : /*
330 : * If this reader is done, remove it from our working array of active
331 : * readers. If all readers are done, we're outta here.
332 : */
333 6760048 : if (readerdone)
334 : {
335 : Assert(!tup);
336 2024 : --gatherstate->nreaders;
337 2024 : if (gatherstate->nreaders == 0)
338 : {
339 746 : ExecShutdownGatherWorkers(gatherstate);
340 2909352 : return NULL;
341 : }
342 1278 : memmove(&gatherstate->reader[gatherstate->nextreader],
343 1278 : &gatherstate->reader[gatherstate->nextreader + 1],
344 : sizeof(TupleQueueReader *)
345 1278 : * (gatherstate->nreaders - gatherstate->nextreader));
346 1278 : if (gatherstate->nextreader >= gatherstate->nreaders)
347 1034 : gatherstate->nextreader = 0;
348 1278 : continue;
349 : }
350 :
351 : /* If we got a tuple, return it. */
352 6758024 : if (tup)
353 1440998 : return tup;
354 :
355 : /*
356 : * Advance nextreader pointer in round-robin fashion. Note that we
357 : * only reach this code if we weren't able to get a tuple from the
358 : * current worker. We used to advance the nextreader pointer after
359 : * every tuple, but it turns out to be much more efficient to keep
360 : * reading from the same queue until that would require blocking.
361 : */
362 5317026 : gatherstate->nextreader++;
363 5317026 : if (gatherstate->nextreader >= gatherstate->nreaders)
364 1479830 : gatherstate->nextreader = 0;
365 :
366 : /* Have we visited every (surviving) TupleQueueReader? */
367 5317026 : nvisited++;
368 5317026 : if (nvisited >= gatherstate->nreaders)
369 : {
370 : /*
371 : * If (still) running plan locally, return NULL so caller can
372 : * generate another tuple from the local copy of the plan.
373 : */
374 1479866 : if (gatherstate->need_to_scan_locally)
375 1467608 : return NULL;
376 :
377 : /* Nothing to do except wait for developments. */
378 12258 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
379 : WAIT_EVENT_EXECUTE_GATHER);
380 12258 : ResetLatch(MyLatch);
381 12258 : nvisited = 0;
382 : }
383 : }
384 : }
385 :
386 : /* ----------------------------------------------------------------
387 : * ExecShutdownGatherWorkers
388 : *
389 : * Stop all the parallel workers.
390 : * ----------------------------------------------------------------
391 : */
392 : static void
393 2538 : ExecShutdownGatherWorkers(GatherState *node)
394 : {
395 2538 : if (node->pei != NULL)
396 1498 : ExecParallelFinish(node->pei);
397 :
398 : /* Flush local copy of reader array */
399 2538 : if (node->reader)
400 746 : pfree(node->reader);
401 2538 : node->reader = NULL;
402 2538 : }
403 :
404 : /* ----------------------------------------------------------------
405 : * ExecShutdownGather
406 : *
407 : * Destroy the setup for parallel workers including parallel context.
408 : * ----------------------------------------------------------------
409 : */
410 : void
411 1492 : ExecShutdownGather(GatherState *node)
412 : {
413 1492 : ExecShutdownGatherWorkers(node);
414 :
415 : /* Now destroy the parallel context. */
416 1492 : if (node->pei != NULL)
417 : {
418 524 : ExecParallelCleanup(node->pei);
419 524 : node->pei = NULL;
420 : }
421 1492 : }
422 :
423 : /* ----------------------------------------------------------------
424 : * Join Support
425 : * ----------------------------------------------------------------
426 : */
427 :
428 : /* ----------------------------------------------------------------
429 : * ExecReScanGather
430 : *
431 : * Prepare to re-scan the result of a Gather.
432 : * ----------------------------------------------------------------
433 : */
434 : void
435 300 : ExecReScanGather(GatherState *node)
436 : {
437 300 : Gather *gather = (Gather *) node->ps.plan;
438 300 : PlanState *outerPlan = outerPlanState(node);
439 :
440 : /* Make sure any existing workers are gracefully shut down */
441 300 : ExecShutdownGatherWorkers(node);
442 :
443 : /* Mark node so that shared state will be rebuilt at next call */
444 300 : node->initialized = false;
445 :
446 : /*
447 : * Set child node's chgParam to tell it that the next scan might deliver a
448 : * different set of rows within the leader process. (The overall rowset
449 : * shouldn't change, but the leader process's subset might; hence nodes
450 : * between here and the parallel table scan node mustn't optimize on the
451 : * assumption of an unchanging rowset.)
452 : */
453 300 : if (gather->rescan_param >= 0)
454 300 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
455 : gather->rescan_param);
456 :
457 : /*
458 : * If chgParam of subnode is not null then plan will be re-scanned by
459 : * first ExecProcNode. Note: because this does nothing if we have a
460 : * rescan_param, it's currently guaranteed that parallel-aware child nodes
461 : * will not see a ReScan call until after they get a ReInitializeDSM call.
462 : * That ordering might not be something to rely on, though. A good rule
463 : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
464 : * should reset only local state, and anything that depends on both of
465 : * those steps being finished must wait until the first ExecProcNode call.
466 : */
467 300 : if (outerPlan->chgParam == NULL)
468 0 : ExecReScan(outerPlan);
469 300 : }
|