Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGather.c
4 : * Support routines for scanning a plan via multiple workers.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * A Gather executor launches parallel workers to run multiple copies of a
10 : * plan. It can also run the plan itself, if the workers are not available
11 : * or have not started up yet. It then merges all of the results it produces
12 : * and the results from the workers into a single output stream. Therefore,
13 : * it will normally be used with a plan where running multiple copies of the
14 : * same plan does not produce duplicate output, such as parallel-aware
15 : * SeqScan.
16 : *
17 : * Alternatively, a Gather node can be configured to use just one worker
18 : * and the single-copy flag can be set. In this case, the Gather node will
19 : * run the plan in one worker and will not execute the plan itself. In
20 : * this case, it simply returns whatever tuples were returned by the worker.
21 : * If a worker cannot be obtained, then it will run the plan itself and
22 : * return the results. Therefore, a plan used with a single-copy Gather
23 : * node need not be parallel-aware.
24 : *
25 : * IDENTIFICATION
26 : * src/backend/executor/nodeGather.c
27 : *
28 : *-------------------------------------------------------------------------
29 : */
30 :
31 : #include "postgres.h"
32 :
33 : #include "access/relscan.h"
34 : #include "access/xact.h"
35 : #include "executor/execdebug.h"
36 : #include "executor/execParallel.h"
37 : #include "executor/nodeGather.h"
38 : #include "executor/nodeSubplan.h"
39 : #include "executor/tqueue.h"
40 : #include "miscadmin.h"
41 : #include "optimizer/optimizer.h"
42 : #include "pgstat.h"
43 : #include "utils/memutils.h"
44 : #include "utils/rel.h"
45 :
46 :
47 : static TupleTableSlot *ExecGather(PlanState *pstate);
48 : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
49 : static MinimalTuple gather_readnext(GatherState *gatherstate);
50 : static void ExecShutdownGatherWorkers(GatherState *node);
51 :
52 :
53 : /* ----------------------------------------------------------------
54 : * ExecInitGather
55 : * ----------------------------------------------------------------
56 : */
57 : GatherState *
58 962 : ExecInitGather(Gather *node, EState *estate, int eflags)
59 : {
60 : GatherState *gatherstate;
61 : Plan *outerNode;
62 : TupleDesc tupDesc;
63 :
64 : /* Gather node doesn't have innerPlan node. */
65 : Assert(innerPlan(node) == NULL);
66 :
67 : /*
68 : * create state structure
69 : */
70 962 : gatherstate = makeNode(GatherState);
71 962 : gatherstate->ps.plan = (Plan *) node;
72 962 : gatherstate->ps.state = estate;
73 962 : gatherstate->ps.ExecProcNode = ExecGather;
74 :
75 962 : gatherstate->initialized = false;
76 962 : gatherstate->need_to_scan_locally =
77 962 : !node->single_copy && parallel_leader_participation;
78 962 : gatherstate->tuples_needed = -1;
79 :
80 : /*
81 : * Miscellaneous initialization
82 : *
83 : * create expression context for node
84 : */
85 962 : ExecAssignExprContext(estate, &gatherstate->ps);
86 :
87 : /*
88 : * now initialize outer plan
89 : */
90 962 : outerNode = outerPlan(node);
91 962 : outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
92 962 : tupDesc = ExecGetResultType(outerPlanState(gatherstate));
93 :
94 : /*
95 : * Leader may access ExecProcNode result directly (if
96 : * need_to_scan_locally), or from workers via tuple queue. So we can't
97 : * trivially rely on the slot type being fixed for expressions evaluated
98 : * within this node.
99 : */
100 962 : gatherstate->ps.outeropsset = true;
101 962 : gatherstate->ps.outeropsfixed = false;
102 :
103 : /*
104 : * Initialize result type and projection.
105 : */
106 962 : ExecInitResultTypeTL(&gatherstate->ps);
107 962 : ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
108 :
109 : /*
110 : * Without projections result slot type is not trivially known, see
111 : * comment above.
112 : */
113 962 : if (gatherstate->ps.ps_ProjInfo == NULL)
114 : {
115 920 : gatherstate->ps.resultopsset = true;
116 920 : gatherstate->ps.resultopsfixed = false;
117 : }
118 :
119 : /*
120 : * Initialize funnel slot to same tuple descriptor as outer plan.
121 : */
122 962 : gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
123 : &TTSOpsMinimalTuple);
124 :
125 : /*
126 : * Gather doesn't support checking a qual (it's always more efficient to
127 : * do it in the child node).
128 : */
129 : Assert(!node->plan.qual);
130 :
131 962 : return gatherstate;
132 : }
133 :
134 : /* ----------------------------------------------------------------
135 : * ExecGather(node)
136 : *
137 : * Scans the relation via multiple workers and returns
138 : * the next qualifying tuple.
139 : * ----------------------------------------------------------------
140 : */
141 : static TupleTableSlot *
142 2918304 : ExecGather(PlanState *pstate)
143 : {
144 2918304 : GatherState *node = castNode(GatherState, pstate);
145 : TupleTableSlot *slot;
146 : ExprContext *econtext;
147 :
148 2918304 : CHECK_FOR_INTERRUPTS();
149 :
150 : /*
151 : * Initialize the parallel context and workers on first execution. We do
152 : * this on first execution rather than during node initialization, as it
153 : * needs to allocate a large dynamic segment, so it is better to do it
154 : * only if it is really needed.
155 : */
156 2918304 : if (!node->initialized)
157 : {
158 770 : EState *estate = node->ps.state;
159 770 : Gather *gather = (Gather *) node->ps.plan;
160 :
161 : /*
162 : * Sometimes we might have to run without parallelism; but if parallel
163 : * mode is active then we can try to fire up some workers.
164 : */
165 770 : if (gather->num_workers > 0 && estate->es_use_parallel_mode)
166 : {
167 : ParallelContext *pcxt;
168 :
169 : /* Initialize, or re-initialize, shared state needed by workers. */
170 764 : if (!node->pei)
171 536 : node->pei = ExecInitParallelPlan(outerPlanState(node),
172 : estate,
173 : gather->initParam,
174 : gather->num_workers,
175 : node->tuples_needed);
176 : else
177 228 : ExecParallelReinitialize(outerPlanState(node),
178 228 : node->pei,
179 : gather->initParam);
180 :
181 : /*
182 : * Register backend workers. We might not get as many as we
183 : * requested, or indeed any at all.
184 : */
185 764 : pcxt = node->pei->pcxt;
186 764 : LaunchParallelWorkers(pcxt);
187 : /* We save # workers launched for the benefit of EXPLAIN */
188 764 : node->nworkers_launched = pcxt->nworkers_launched;
189 :
190 : /* Set up tuple queue readers to read the results. */
191 764 : if (pcxt->nworkers_launched > 0)
192 : {
193 758 : ExecParallelCreateReaders(node->pei);
194 : /* Make a working array showing the active readers */
195 758 : node->nreaders = pcxt->nworkers_launched;
196 758 : node->reader = (TupleQueueReader **)
197 758 : palloc(node->nreaders * sizeof(TupleQueueReader *));
198 758 : memcpy(node->reader, node->pei->reader,
199 758 : node->nreaders * sizeof(TupleQueueReader *));
200 : }
201 : else
202 : {
203 : /* No workers? Then never mind. */
204 6 : node->nreaders = 0;
205 6 : node->reader = NULL;
206 : }
207 764 : node->nextreader = 0;
208 : }
209 :
210 : /* Run plan locally if no workers or enabled and not single-copy. */
211 1540 : node->need_to_scan_locally = (node->nreaders == 0)
212 770 : || (!gather->single_copy && parallel_leader_participation);
213 770 : node->initialized = true;
214 : }
215 :
216 : /*
217 : * Reset per-tuple memory context to free any expression evaluation
218 : * storage allocated in the previous tuple cycle.
219 : */
220 2918304 : econtext = node->ps.ps_ExprContext;
221 2918304 : ResetExprContext(econtext);
222 :
223 : /*
224 : * Get next tuple, either from one of our workers, or by running the plan
225 : * ourselves.
226 : */
227 2918304 : slot = gather_getnext(node);
228 2918298 : if (TupIsNull(slot))
229 764 : return NULL;
230 :
231 : /* If no projection is required, we're done. */
232 2917534 : if (node->ps.ps_ProjInfo == NULL)
233 2917534 : return slot;
234 :
235 : /*
236 : * Form the result tuple using ExecProject(), and return it.
237 : */
238 0 : econtext->ecxt_outertuple = slot;
239 0 : return ExecProject(node->ps.ps_ProjInfo);
240 : }
241 :
242 : /* ----------------------------------------------------------------
243 : * ExecEndGather
244 : *
245 : * frees any storage allocated through C routines.
246 : * ----------------------------------------------------------------
247 : */
248 : void
249 956 : ExecEndGather(GatherState *node)
250 : {
251 956 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
252 956 : ExecShutdownGather(node);
253 956 : ExecFreeExprContext(&node->ps);
254 956 : if (node->ps.ps_ResultTupleSlot)
255 42 : ExecClearTuple(node->ps.ps_ResultTupleSlot);
256 956 : }
257 :
258 : /*
259 : * Read the next tuple. We might fetch a tuple from one of the tuple queues
260 : * using gather_readnext, or if no tuple queue contains a tuple and the
261 : * single_copy flag is not set, we might generate one locally instead.
262 : */
263 : static TupleTableSlot *
264 2918304 : gather_getnext(GatherState *gatherstate)
265 : {
266 2918304 : PlanState *outerPlan = outerPlanState(gatherstate);
267 : TupleTableSlot *outerTupleSlot;
268 2918304 : TupleTableSlot *fslot = gatherstate->funnel_slot;
269 : MinimalTuple tup;
270 :
271 2919600 : while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
272 : {
273 2918836 : CHECK_FOR_INTERRUPTS();
274 :
275 2918836 : if (gatherstate->nreaders > 0)
276 : {
277 2918806 : tup = gather_readnext(gatherstate);
278 :
279 2918800 : if (HeapTupleIsValid(tup))
280 : {
281 1440982 : ExecStoreMinimalTuple(tup, /* tuple to store */
282 : fslot, /* slot to store the tuple */
283 : false); /* don't pfree tuple */
284 1440982 : return fslot;
285 : }
286 : }
287 :
288 1477848 : if (gatherstate->need_to_scan_locally)
289 : {
290 1477122 : EState *estate = gatherstate->ps.state;
291 :
292 : /* Install our DSA area while executing the plan. */
293 1477122 : estate->es_query_dsa =
294 1477122 : gatherstate->pei ? gatherstate->pei->area : NULL;
295 1477122 : outerTupleSlot = ExecProcNode(outerPlan);
296 1477122 : estate->es_query_dsa = NULL;
297 :
298 1477122 : if (!TupIsNull(outerTupleSlot))
299 1476552 : return outerTupleSlot;
300 :
301 570 : gatherstate->need_to_scan_locally = false;
302 : }
303 : }
304 :
305 764 : return ExecClearTuple(fslot);
306 : }
307 :
308 : /*
309 : * Attempt to read a tuple from one of our parallel workers.
310 : */
311 : static MinimalTuple
312 2918806 : gather_readnext(GatherState *gatherstate)
313 : {
314 2918806 : int nvisited = 0;
315 :
316 : for (;;)
317 3882608 : {
318 : TupleQueueReader *reader;
319 : MinimalTuple tup;
320 : bool readerdone;
321 :
322 : /* Check for async events, particularly messages from workers. */
323 6801414 : CHECK_FOR_INTERRUPTS();
324 :
325 : /*
326 : * Attempt to read a tuple, but don't block if none is available.
327 : *
328 : * Note that TupleQueueReaderNext will just return NULL for a worker
329 : * which fails to initialize. We'll treat that worker as having
330 : * produced no tuples; WaitForParallelWorkersToFinish will error out
331 : * when we get there.
332 : */
333 : Assert(gatherstate->nextreader < gatherstate->nreaders);
334 6801408 : reader = gatherstate->reader[gatherstate->nextreader];
335 6801408 : tup = TupleQueueReaderNext(reader, true, &readerdone);
336 :
337 : /*
338 : * If this reader is done, remove it from our working array of active
339 : * readers. If all readers are done, we're outta here.
340 : */
341 6801408 : if (readerdone)
342 : {
343 : Assert(!tup);
344 2042 : --gatherstate->nreaders;
345 2042 : if (gatherstate->nreaders == 0)
346 : {
347 752 : ExecShutdownGatherWorkers(gatherstate);
348 2918800 : return NULL;
349 : }
350 1290 : memmove(&gatherstate->reader[gatherstate->nextreader],
351 1290 : &gatherstate->reader[gatherstate->nextreader + 1],
352 : sizeof(TupleQueueReader *)
353 1290 : * (gatherstate->nreaders - gatherstate->nextreader));
354 1290 : if (gatherstate->nextreader >= gatherstate->nreaders)
355 1026 : gatherstate->nextreader = 0;
356 1290 : continue;
357 : }
358 :
359 : /* If we got a tuple, return it. */
360 6799366 : if (tup)
361 1440982 : return tup;
362 :
363 : /*
364 : * Advance nextreader pointer in round-robin fashion. Note that we
365 : * only reach this code if we weren't able to get a tuple from the
366 : * current worker. We used to advance the nextreader pointer after
367 : * every tuple, but it turns out to be much more efficient to keep
368 : * reading from the same queue until that would require blocking.
369 : */
370 5358384 : gatherstate->nextreader++;
371 5358384 : if (gatherstate->nextreader >= gatherstate->nreaders)
372 1493076 : gatherstate->nextreader = 0;
373 :
374 : /* Have we visited every (surviving) TupleQueueReader? */
375 5358384 : nvisited++;
376 5358384 : if (nvisited >= gatherstate->nreaders)
377 : {
378 : /*
379 : * If (still) running plan locally, return NULL so caller can
380 : * generate another tuple from the local copy of the plan.
381 : */
382 1493036 : if (gatherstate->need_to_scan_locally)
383 1477066 : return NULL;
384 :
385 : /* Nothing to do except wait for developments. */
386 15970 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
387 : WAIT_EVENT_EXECUTE_GATHER);
388 15970 : ResetLatch(MyLatch);
389 15970 : nvisited = 0;
390 : }
391 : }
392 : }
393 :
394 : /* ----------------------------------------------------------------
395 : * ExecShutdownGatherWorkers
396 : *
397 : * Stop all the parallel workers.
398 : * ----------------------------------------------------------------
399 : */
400 : static void
401 2556 : ExecShutdownGatherWorkers(GatherState *node)
402 : {
403 2556 : if (node->pei != NULL)
404 1510 : ExecParallelFinish(node->pei);
405 :
406 : /* Flush local copy of reader array */
407 2556 : if (node->reader)
408 752 : pfree(node->reader);
409 2556 : node->reader = NULL;
410 2556 : }
411 :
412 : /* ----------------------------------------------------------------
413 : * ExecShutdownGather
414 : *
415 : * Destroy the setup for parallel workers including parallel context.
416 : * ----------------------------------------------------------------
417 : */
418 : void
419 1504 : ExecShutdownGather(GatherState *node)
420 : {
421 1504 : ExecShutdownGatherWorkers(node);
422 :
423 : /* Now destroy the parallel context. */
424 1504 : if (node->pei != NULL)
425 : {
426 530 : ExecParallelCleanup(node->pei);
427 530 : node->pei = NULL;
428 : }
429 1504 : }
430 :
431 : /* ----------------------------------------------------------------
432 : * Join Support
433 : * ----------------------------------------------------------------
434 : */
435 :
436 : /* ----------------------------------------------------------------
437 : * ExecReScanGather
438 : *
439 : * Prepare to re-scan the result of a Gather.
440 : * ----------------------------------------------------------------
441 : */
442 : void
443 300 : ExecReScanGather(GatherState *node)
444 : {
445 300 : Gather *gather = (Gather *) node->ps.plan;
446 300 : PlanState *outerPlan = outerPlanState(node);
447 :
448 : /* Make sure any existing workers are gracefully shut down */
449 300 : ExecShutdownGatherWorkers(node);
450 :
451 : /* Mark node so that shared state will be rebuilt at next call */
452 300 : node->initialized = false;
453 :
454 : /*
455 : * Set child node's chgParam to tell it that the next scan might deliver a
456 : * different set of rows within the leader process. (The overall rowset
457 : * shouldn't change, but the leader process's subset might; hence nodes
458 : * between here and the parallel table scan node mustn't optimize on the
459 : * assumption of an unchanging rowset.)
460 : */
461 300 : if (gather->rescan_param >= 0)
462 300 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
463 : gather->rescan_param);
464 :
465 : /*
466 : * If chgParam of subnode is not null then plan will be re-scanned by
467 : * first ExecProcNode. Note: because this does nothing if we have a
468 : * rescan_param, it's currently guaranteed that parallel-aware child nodes
469 : * will not see a ReScan call until after they get a ReInitializeDSM call.
470 : * That ordering might not be something to rely on, though. A good rule
471 : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
472 : * should reset only local state, and anything that depends on both of
473 : * those steps being finished must wait until the first ExecProcNode call.
474 : */
475 300 : if (outerPlan->chgParam == NULL)
476 0 : ExecReScan(outerPlan);
477 300 : }
|