Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGather.c
4 : * Support routines for scanning a plan via multiple workers.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * A Gather executor launches parallel workers to run multiple copies of a
10 : * plan. It can also run the plan itself, if the workers are not available
11 : * or have not started up yet. It then merges all of the results it produces
12 : * and the results from the workers into a single output stream. Therefore,
13 : * it will normally be used with a plan where running multiple copies of the
14 : * same plan does not produce duplicate output, such as parallel-aware
15 : * SeqScan.
16 : *
17 : * Alternatively, a Gather node can be configured to use just one worker
18 : * and the single-copy flag can be set. In this case, the Gather node will
19 : * run the plan in one worker and will not execute the plan itself. In
20 : * this case, it simply returns whatever tuples were returned by the worker.
21 : * If a worker cannot be obtained, then it will run the plan itself and
22 : * return the results. Therefore, a plan used with a single-copy Gather
23 : * node need not be parallel-aware.
24 : *
25 : * IDENTIFICATION
26 : * src/backend/executor/nodeGather.c
27 : *
28 : *-------------------------------------------------------------------------
29 : */
30 :
31 : #include "postgres.h"
32 :
33 : #include "access/relscan.h"
34 : #include "access/xact.h"
35 : #include "executor/execdebug.h"
36 : #include "executor/execParallel.h"
37 : #include "executor/nodeGather.h"
38 : #include "executor/nodeSubplan.h"
39 : #include "executor/tqueue.h"
40 : #include "miscadmin.h"
41 : #include "optimizer/optimizer.h"
42 : #include "pgstat.h"
43 : #include "utils/memutils.h"
44 : #include "utils/rel.h"
45 :
46 :
47 : static TupleTableSlot *ExecGather(PlanState *pstate);
48 : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
49 : static MinimalTuple gather_readnext(GatherState *gatherstate);
50 : static void ExecShutdownGatherWorkers(GatherState *node);
51 :
52 :
53 : /* ----------------------------------------------------------------
54 : * ExecInitGather
55 : * ----------------------------------------------------------------
56 : */
57 : GatherState *
58 950 : ExecInitGather(Gather *node, EState *estate, int eflags)
59 : {
60 : GatherState *gatherstate;
61 : Plan *outerNode;
62 : TupleDesc tupDesc;
63 :
64 : /* Gather node doesn't have innerPlan node. */
65 : Assert(innerPlan(node) == NULL);
66 :
67 : /*
68 : * create state structure
69 : */
70 950 : gatherstate = makeNode(GatherState);
71 950 : gatherstate->ps.plan = (Plan *) node;
72 950 : gatherstate->ps.state = estate;
73 950 : gatherstate->ps.ExecProcNode = ExecGather;
74 :
75 950 : gatherstate->initialized = false;
76 950 : gatherstate->need_to_scan_locally =
77 950 : !node->single_copy && parallel_leader_participation;
78 950 : gatherstate->tuples_needed = -1;
79 :
80 : /*
81 : * Miscellaneous initialization
82 : *
83 : * create expression context for node
84 : */
85 950 : ExecAssignExprContext(estate, &gatherstate->ps);
86 :
87 : /*
88 : * now initialize outer plan
89 : */
90 950 : outerNode = outerPlan(node);
91 950 : outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
92 950 : tupDesc = ExecGetResultType(outerPlanState(gatherstate));
93 :
94 : /*
95 : * Leader may access ExecProcNode result directly (if
96 : * need_to_scan_locally), or from workers via tuple queue. So we can't
97 : * trivially rely on the slot type being fixed for expressions evaluated
98 : * within this node.
99 : */
100 950 : gatherstate->ps.outeropsset = true;
101 950 : gatherstate->ps.outeropsfixed = false;
102 :
103 : /*
104 : * Initialize result type and projection.
105 : */
106 950 : ExecInitResultTypeTL(&gatherstate->ps);
107 950 : ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
108 :
109 : /*
110 : * Without projections result slot type is not trivially known, see
111 : * comment above.
112 : */
113 950 : if (gatherstate->ps.ps_ProjInfo == NULL)
114 : {
115 908 : gatherstate->ps.resultopsset = true;
116 908 : gatherstate->ps.resultopsfixed = false;
117 : }
118 :
119 : /*
120 : * Initialize funnel slot to same tuple descriptor as outer plan.
121 : */
122 950 : gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
123 : &TTSOpsMinimalTuple);
124 :
125 : /*
126 : * Gather doesn't support checking a qual (it's always more efficient to
127 : * do it in the child node).
128 : */
129 : Assert(!node->plan.qual);
130 :
131 950 : return gatherstate;
132 : }
133 :
134 : /* ----------------------------------------------------------------
135 : * ExecGather(node)
136 : *
137 : * Scans the relation via multiple workers and returns
138 : * the next qualifying tuple.
139 : * ----------------------------------------------------------------
140 : */
141 : static TupleTableSlot *
142 2918300 : ExecGather(PlanState *pstate)
143 : {
144 2918300 : GatherState *node = castNode(GatherState, pstate);
145 : TupleTableSlot *slot;
146 : ExprContext *econtext;
147 :
148 2918300 : CHECK_FOR_INTERRUPTS();
149 :
150 : /*
151 : * Initialize the parallel context and workers on first execution. We do
152 : * this on first execution rather than during node initialization, as it
153 : * needs to allocate a large dynamic segment, so it is better to do it
154 : * only if it is really needed.
155 : */
156 2918300 : if (!node->initialized)
157 : {
158 758 : EState *estate = node->ps.state;
159 758 : Gather *gather = (Gather *) node->ps.plan;
160 :
161 : /*
162 : * Sometimes we might have to run without parallelism; but if parallel
163 : * mode is active then we can try to fire up some workers.
164 : */
165 758 : if (gather->num_workers > 0 && estate->es_use_parallel_mode)
166 : {
167 : ParallelContext *pcxt;
168 :
169 : /* Initialize, or re-initialize, shared state needed by workers. */
170 752 : if (!node->pei)
171 524 : node->pei = ExecInitParallelPlan(outerPlanState(node),
172 : estate,
173 : gather->initParam,
174 : gather->num_workers,
175 : node->tuples_needed);
176 : else
177 228 : ExecParallelReinitialize(outerPlanState(node),
178 228 : node->pei,
179 : gather->initParam);
180 :
181 : /*
182 : * Register backend workers. We might not get as many as we
183 : * requested, or indeed any at all.
184 : */
185 752 : pcxt = node->pei->pcxt;
186 752 : LaunchParallelWorkers(pcxt);
187 : /* We save # workers launched for the benefit of EXPLAIN */
188 752 : node->nworkers_launched = pcxt->nworkers_launched;
189 :
190 : /* Set up tuple queue readers to read the results. */
191 752 : if (pcxt->nworkers_launched > 0)
192 : {
193 746 : ExecParallelCreateReaders(node->pei);
194 : /* Make a working array showing the active readers */
195 746 : node->nreaders = pcxt->nworkers_launched;
196 746 : node->reader = (TupleQueueReader **)
197 746 : palloc(node->nreaders * sizeof(TupleQueueReader *));
198 746 : memcpy(node->reader, node->pei->reader,
199 746 : node->nreaders * sizeof(TupleQueueReader *));
200 : }
201 : else
202 : {
203 : /* No workers? Then never mind. */
204 6 : node->nreaders = 0;
205 6 : node->reader = NULL;
206 : }
207 752 : node->nextreader = 0;
208 : }
209 :
210 : /* Run plan locally if no workers or enabled and not single-copy. */
211 1516 : node->need_to_scan_locally = (node->nreaders == 0)
212 758 : || (!gather->single_copy && parallel_leader_participation);
213 758 : node->initialized = true;
214 : }
215 :
216 : /*
217 : * Reset per-tuple memory context to free any expression evaluation
218 : * storage allocated in the previous tuple cycle.
219 : */
220 2918300 : econtext = node->ps.ps_ExprContext;
221 2918300 : ResetExprContext(econtext);
222 :
223 : /*
224 : * Get next tuple, either from one of our workers, or by running the plan
225 : * ourselves.
226 : */
227 2918300 : slot = gather_getnext(node);
228 2918294 : if (TupIsNull(slot))
229 752 : return NULL;
230 :
231 : /* If no projection is required, we're done. */
232 2917542 : if (node->ps.ps_ProjInfo == NULL)
233 2917542 : return slot;
234 :
235 : /*
236 : * Form the result tuple using ExecProject(), and return it.
237 : */
238 0 : econtext->ecxt_outertuple = slot;
239 0 : return ExecProject(node->ps.ps_ProjInfo);
240 : }
241 :
242 : /* ----------------------------------------------------------------
243 : * ExecEndGather
244 : *
245 : * frees any storage allocated through C routines.
246 : * ----------------------------------------------------------------
247 : */
248 : void
249 944 : ExecEndGather(GatherState *node)
250 : {
251 944 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
252 944 : ExecShutdownGather(node);
253 944 : }
254 :
255 : /*
256 : * Read the next tuple. We might fetch a tuple from one of the tuple queues
257 : * using gather_readnext, or if no tuple queue contains a tuple and the
258 : * single_copy flag is not set, we might generate one locally instead.
259 : */
260 : static TupleTableSlot *
261 2918300 : gather_getnext(GatherState *gatherstate)
262 : {
263 2918300 : PlanState *outerPlan = outerPlanState(gatherstate);
264 : TupleTableSlot *outerTupleSlot;
265 2918300 : TupleTableSlot *fslot = gatherstate->funnel_slot;
266 : MinimalTuple tup;
267 :
268 2919572 : while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
269 : {
270 2918820 : CHECK_FOR_INTERRUPTS();
271 :
272 2918820 : if (gatherstate->nreaders > 0)
273 : {
274 2884850 : tup = gather_readnext(gatherstate);
275 :
276 2884844 : if (HeapTupleIsValid(tup))
277 : {
278 1441002 : ExecStoreMinimalTuple(tup, /* tuple to store */
279 : fslot, /* slot to store the tuple */
280 : false); /* don't pfree tuple */
281 1441002 : return fslot;
282 : }
283 : }
284 :
285 1477812 : if (gatherstate->need_to_scan_locally)
286 : {
287 1477098 : EState *estate = gatherstate->ps.state;
288 :
289 : /* Install our DSA area while executing the plan. */
290 1477098 : estate->es_query_dsa =
291 1477098 : gatherstate->pei ? gatherstate->pei->area : NULL;
292 1477098 : outerTupleSlot = ExecProcNode(outerPlan);
293 1477098 : estate->es_query_dsa = NULL;
294 :
295 1477098 : if (!TupIsNull(outerTupleSlot))
296 1476540 : return outerTupleSlot;
297 :
298 558 : gatherstate->need_to_scan_locally = false;
299 : }
300 : }
301 :
302 752 : return ExecClearTuple(fslot);
303 : }
304 :
305 : /*
306 : * Attempt to read a tuple from one of our parallel workers.
307 : */
308 : static MinimalTuple
309 2884850 : gather_readnext(GatherState *gatherstate)
310 : {
311 2884850 : int nvisited = 0;
312 :
313 : for (;;)
314 3862454 : {
315 : TupleQueueReader *reader;
316 : MinimalTuple tup;
317 : bool readerdone;
318 :
319 : /* Check for async events, particularly messages from workers. */
320 6747304 : CHECK_FOR_INTERRUPTS();
321 :
322 : /*
323 : * Attempt to read a tuple, but don't block if none is available.
324 : *
325 : * Note that TupleQueueReaderNext will just return NULL for a worker
326 : * which fails to initialize. We'll treat that worker as having
327 : * produced no tuples; WaitForParallelWorkersToFinish will error out
328 : * when we get there.
329 : */
330 : Assert(gatherstate->nextreader < gatherstate->nreaders);
331 6747298 : reader = gatherstate->reader[gatherstate->nextreader];
332 6747298 : tup = TupleQueueReaderNext(reader, true, &readerdone);
333 :
334 : /*
335 : * If this reader is done, remove it from our working array of active
336 : * readers. If all readers are done, we're outta here.
337 : */
338 6747298 : if (readerdone)
339 : {
340 : Assert(!tup);
341 2016 : --gatherstate->nreaders;
342 2016 : if (gatherstate->nreaders == 0)
343 : {
344 740 : ExecShutdownGatherWorkers(gatherstate);
345 2884844 : return NULL;
346 : }
347 1276 : memmove(&gatherstate->reader[gatherstate->nextreader],
348 1276 : &gatherstate->reader[gatherstate->nextreader + 1],
349 : sizeof(TupleQueueReader *)
350 1276 : * (gatherstate->nreaders - gatherstate->nextreader));
351 1276 : if (gatherstate->nextreader >= gatherstate->nreaders)
352 1062 : gatherstate->nextreader = 0;
353 1276 : continue;
354 : }
355 :
356 : /* If we got a tuple, return it. */
357 6745282 : if (tup)
358 1441002 : return tup;
359 :
360 : /*
361 : * Advance nextreader pointer in round-robin fashion. Note that we
362 : * only reach this code if we weren't able to get a tuple from the
363 : * current worker. We used to advance the nextreader pointer after
364 : * every tuple, but it turns out to be much more efficient to keep
365 : * reading from the same queue until that would require blocking.
366 : */
367 5304280 : gatherstate->nextreader++;
368 5304280 : if (gatherstate->nextreader >= gatherstate->nreaders)
369 1459028 : gatherstate->nextreader = 0;
370 :
371 : /* Have we visited every (surviving) TupleQueueReader? */
372 5304280 : nvisited++;
373 5304280 : if (nvisited >= gatherstate->nreaders)
374 : {
375 : /*
376 : * If (still) running plan locally, return NULL so caller can
377 : * generate another tuple from the local copy of the plan.
378 : */
379 1459032 : if (gatherstate->need_to_scan_locally)
380 1443102 : return NULL;
381 :
382 : /* Nothing to do except wait for developments. */
383 15930 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
384 : WAIT_EVENT_EXECUTE_GATHER);
385 15930 : ResetLatch(MyLatch);
386 15930 : nvisited = 0;
387 : }
388 : }
389 : }
390 :
391 : /* ----------------------------------------------------------------
392 : * ExecShutdownGatherWorkers
393 : *
394 : * Stop all the parallel workers.
395 : * ----------------------------------------------------------------
396 : */
397 : static void
398 2520 : ExecShutdownGatherWorkers(GatherState *node)
399 : {
400 2520 : if (node->pei != NULL)
401 1486 : ExecParallelFinish(node->pei);
402 :
403 : /* Flush local copy of reader array */
404 2520 : if (node->reader)
405 740 : pfree(node->reader);
406 2520 : node->reader = NULL;
407 2520 : }
408 :
409 : /* ----------------------------------------------------------------
410 : * ExecShutdownGather
411 : *
412 : * Destroy the setup for parallel workers including parallel context.
413 : * ----------------------------------------------------------------
414 : */
415 : void
416 1480 : ExecShutdownGather(GatherState *node)
417 : {
418 1480 : ExecShutdownGatherWorkers(node);
419 :
420 : /* Now destroy the parallel context. */
421 1480 : if (node->pei != NULL)
422 : {
423 518 : ExecParallelCleanup(node->pei);
424 518 : node->pei = NULL;
425 : }
426 1480 : }
427 :
428 : /* ----------------------------------------------------------------
429 : * Join Support
430 : * ----------------------------------------------------------------
431 : */
432 :
433 : /* ----------------------------------------------------------------
434 : * ExecReScanGather
435 : *
436 : * Prepare to re-scan the result of a Gather.
437 : * ----------------------------------------------------------------
438 : */
439 : void
440 300 : ExecReScanGather(GatherState *node)
441 : {
442 300 : Gather *gather = (Gather *) node->ps.plan;
443 300 : PlanState *outerPlan = outerPlanState(node);
444 :
445 : /* Make sure any existing workers are gracefully shut down */
446 300 : ExecShutdownGatherWorkers(node);
447 :
448 : /* Mark node so that shared state will be rebuilt at next call */
449 300 : node->initialized = false;
450 :
451 : /*
452 : * Set child node's chgParam to tell it that the next scan might deliver a
453 : * different set of rows within the leader process. (The overall rowset
454 : * shouldn't change, but the leader process's subset might; hence nodes
455 : * between here and the parallel table scan node mustn't optimize on the
456 : * assumption of an unchanging rowset.)
457 : */
458 300 : if (gather->rescan_param >= 0)
459 300 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
460 : gather->rescan_param);
461 :
462 : /*
463 : * If chgParam of subnode is not null then plan will be re-scanned by
464 : * first ExecProcNode. Note: because this does nothing if we have a
465 : * rescan_param, it's currently guaranteed that parallel-aware child nodes
466 : * will not see a ReScan call until after they get a ReInitializeDSM call.
467 : * That ordering might not be something to rely on, though. A good rule
468 : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
469 : * should reset only local state, and anything that depends on both of
470 : * those steps being finished must wait until the first ExecProcNode call.
471 : */
472 300 : if (outerPlan->chgParam == NULL)
473 0 : ExecReScan(outerPlan);
474 300 : }
|