Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execParallel.c
4 : * Support routines for parallel execution.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * This file contains routines that are intended to support setting up,
10 : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : * executor. The ParallelContext machinery will handle starting the
12 : * workers and ensuring that their state generally matches that of the
13 : * leader; see src/backend/access/transam/README.parallel for details.
14 : * However, we must save and restore relevant executor state, such as
15 : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : * the actual plan to be passed down to the worker.
17 : *
18 : * IDENTIFICATION
19 : * src/backend/executor/execParallel.c
20 : *
21 : *-------------------------------------------------------------------------
22 : */
23 :
24 : #include "postgres.h"
25 :
26 : #include "executor/execParallel.h"
27 : #include "executor/executor.h"
28 : #include "executor/nodeAgg.h"
29 : #include "executor/nodeAppend.h"
30 : #include "executor/nodeBitmapHeapscan.h"
31 : #include "executor/nodeCustom.h"
32 : #include "executor/nodeForeignscan.h"
33 : #include "executor/nodeHash.h"
34 : #include "executor/nodeHashjoin.h"
35 : #include "executor/nodeIncrementalSort.h"
36 : #include "executor/nodeIndexonlyscan.h"
37 : #include "executor/nodeIndexscan.h"
38 : #include "executor/nodeMemoize.h"
39 : #include "executor/nodeSeqscan.h"
40 : #include "executor/nodeSort.h"
41 : #include "executor/nodeSubplan.h"
42 : #include "executor/tqueue.h"
43 : #include "jit/jit.h"
44 : #include "nodes/nodeFuncs.h"
45 : #include "pgstat.h"
46 : #include "storage/spin.h"
47 : #include "tcop/tcopprot.h"
48 : #include "utils/datum.h"
49 : #include "utils/dsa.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/memutils.h"
52 : #include "utils/snapmgr.h"
53 :
54 : /*
55 : * Magic numbers for parallel executor communication. We use constants
56 : * greater than any 32-bit integer here so that values < 2^32 can be used
57 : * by individual parallel nodes to store their own state.
58 : */
59 : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
60 : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
61 : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
62 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
63 : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
64 : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
65 : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
66 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
67 : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
68 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
69 :
70 : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
71 :
72 : /*
73 : * Fixed-size random stuff that we need to pass to parallel workers.
74 : */
75 : typedef struct FixedParallelExecutorState
76 : {
77 : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
78 : dsa_pointer param_exec;
79 : int eflags;
80 : int jit_flags;
81 : } FixedParallelExecutorState;
82 :
83 : /*
84 : * DSM structure for accumulating per-PlanState instrumentation.
85 : *
86 : * instrument_options: Same meaning here as in instrument.c.
87 : *
88 : * instrument_offset: Offset, relative to the start of this structure,
89 : * of the first Instrumentation object. This will depend on the length of
90 : * the plan_node_id array.
91 : *
92 : * num_workers: Number of workers.
93 : *
94 : * num_plan_nodes: Number of plan nodes.
95 : *
96 : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
97 : * from parallel workers. The length of this array is given by num_plan_nodes.
98 : */
99 : struct SharedExecutorInstrumentation
100 : {
101 : int instrument_options;
102 : int instrument_offset;
103 : int num_workers;
104 : int num_plan_nodes;
105 : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
106 : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
107 : };
108 : #define GetInstrumentationArray(sei) \
109 : (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
110 : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
111 :
112 : /* Context object for ExecParallelEstimate. */
113 : typedef struct ExecParallelEstimateContext
114 : {
115 : ParallelContext *pcxt;
116 : int nnodes;
117 : } ExecParallelEstimateContext;
118 :
119 : /* Context object for ExecParallelInitializeDSM. */
120 : typedef struct ExecParallelInitializeDSMContext
121 : {
122 : ParallelContext *pcxt;
123 : SharedExecutorInstrumentation *instrumentation;
124 : int nnodes;
125 : } ExecParallelInitializeDSMContext;
126 :
127 : /* Helper functions that run in the parallel leader. */
128 : static char *ExecSerializePlan(Plan *plan, EState *estate);
129 : static bool ExecParallelEstimate(PlanState *planstate,
130 : ExecParallelEstimateContext *e);
131 : static bool ExecParallelInitializeDSM(PlanState *planstate,
132 : ExecParallelInitializeDSMContext *d);
133 : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
134 : bool reinitialize);
135 : static bool ExecParallelReInitializeDSM(PlanState *planstate,
136 : ParallelContext *pcxt);
137 : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
138 : SharedExecutorInstrumentation *instrumentation);
139 :
140 : /* Helper function that runs in the parallel worker. */
141 : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
142 :
143 : /*
144 : * Create a serialized representation of the plan to be sent to each worker.
145 : */
146 : static char *
147 650 : ExecSerializePlan(Plan *plan, EState *estate)
148 : {
149 : PlannedStmt *pstmt;
150 : ListCell *lc;
151 :
152 : /* We can't scribble on the original plan, so make a copy. */
153 650 : plan = copyObject(plan);
154 :
155 : /*
156 : * The worker will start its own copy of the executor, and that copy will
157 : * insert a junk filter if the toplevel node has any resjunk entries. We
158 : * don't want that to happen, because while resjunk columns shouldn't be
159 : * sent back to the user, here the tuples are coming back to another
160 : * backend which may very well need them. So mutate the target list
161 : * accordingly. This is sort of a hack; there might be better ways to do
162 : * this...
163 : */
164 1782 : foreach(lc, plan->targetlist)
165 : {
166 1132 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
167 :
168 1132 : tle->resjunk = false;
169 : }
170 :
171 : /*
172 : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
173 : * for our purposes, but the worker will need at least a minimal
174 : * PlannedStmt to start the executor.
175 : */
176 650 : pstmt = makeNode(PlannedStmt);
177 650 : pstmt->commandType = CMD_SELECT;
178 650 : pstmt->queryId = pgstat_get_my_query_id();
179 650 : pstmt->hasReturning = false;
180 650 : pstmt->hasModifyingCTE = false;
181 650 : pstmt->canSetTag = true;
182 650 : pstmt->transientPlan = false;
183 650 : pstmt->dependsOnRole = false;
184 650 : pstmt->parallelModeNeeded = false;
185 650 : pstmt->planTree = plan;
186 650 : pstmt->rtable = estate->es_range_table;
187 650 : pstmt->permInfos = estate->es_rteperminfos;
188 650 : pstmt->resultRelations = NIL;
189 650 : pstmt->appendRelations = NIL;
190 :
191 : /*
192 : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
193 : * for unsafe ones (so that the list indexes of the safe ones are
194 : * preserved). This positively ensures that the worker won't try to run,
195 : * or even do ExecInitNode on, an unsafe subplan. That's important to
196 : * protect, eg, non-parallel-aware FDWs from getting into trouble.
197 : */
198 650 : pstmt->subplans = NIL;
199 710 : foreach(lc, estate->es_plannedstmt->subplans)
200 : {
201 60 : Plan *subplan = (Plan *) lfirst(lc);
202 :
203 60 : if (subplan && !subplan->parallel_safe)
204 54 : subplan = NULL;
205 60 : pstmt->subplans = lappend(pstmt->subplans, subplan);
206 : }
207 :
208 650 : pstmt->rewindPlanIDs = NULL;
209 650 : pstmt->rowMarks = NIL;
210 650 : pstmt->relationOids = NIL;
211 650 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
212 650 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
213 650 : pstmt->utilityStmt = NULL;
214 650 : pstmt->stmt_location = -1;
215 650 : pstmt->stmt_len = -1;
216 :
217 : /* Return serialized copy of our dummy PlannedStmt. */
218 650 : return nodeToString(pstmt);
219 : }
220 :
221 : /*
222 : * Parallel-aware plan nodes (and occasionally others) may need some state
223 : * which is shared across all parallel workers. Before we size the DSM, give
224 : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
225 : * &pcxt->estimator.
226 : *
227 : * While we're at it, count the number of PlanState nodes in the tree, so
228 : * we know how many Instrumentation structures we need.
229 : */
230 : static bool
231 2854 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
232 : {
233 2854 : if (planstate == NULL)
234 0 : return false;
235 :
236 : /* Count this node. */
237 2854 : e->nnodes++;
238 :
239 2854 : switch (nodeTag(planstate))
240 : {
241 1130 : case T_SeqScanState:
242 1130 : if (planstate->plan->parallel_aware)
243 892 : ExecSeqScanEstimate((SeqScanState *) planstate,
244 : e->pcxt);
245 1130 : break;
246 288 : case T_IndexScanState:
247 288 : if (planstate->plan->parallel_aware)
248 12 : ExecIndexScanEstimate((IndexScanState *) planstate,
249 : e->pcxt);
250 288 : break;
251 52 : case T_IndexOnlyScanState:
252 52 : if (planstate->plan->parallel_aware)
253 40 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
254 : e->pcxt);
255 52 : break;
256 0 : case T_ForeignScanState:
257 0 : if (planstate->plan->parallel_aware)
258 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
259 : e->pcxt);
260 0 : break;
261 180 : case T_AppendState:
262 180 : if (planstate->plan->parallel_aware)
263 132 : ExecAppendEstimate((AppendState *) planstate,
264 : e->pcxt);
265 180 : break;
266 0 : case T_CustomScanState:
267 0 : if (planstate->plan->parallel_aware)
268 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
269 : e->pcxt);
270 0 : break;
271 20 : case T_BitmapHeapScanState:
272 20 : if (planstate->plan->parallel_aware)
273 18 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
274 : e->pcxt);
275 20 : break;
276 192 : case T_HashJoinState:
277 192 : if (planstate->plan->parallel_aware)
278 120 : ExecHashJoinEstimate((HashJoinState *) planstate,
279 : e->pcxt);
280 192 : break;
281 192 : case T_HashState:
282 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
283 192 : ExecHashEstimate((HashState *) planstate, e->pcxt);
284 192 : break;
285 140 : case T_SortState:
286 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
287 140 : ExecSortEstimate((SortState *) planstate, e->pcxt);
288 140 : break;
289 0 : case T_IncrementalSortState:
290 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
291 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
292 0 : break;
293 558 : case T_AggState:
294 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
295 558 : ExecAggEstimate((AggState *) planstate, e->pcxt);
296 558 : break;
297 6 : case T_MemoizeState:
298 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
299 6 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
300 6 : break;
301 96 : default:
302 96 : break;
303 : }
304 :
305 2854 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
306 : }
307 :
308 : /*
309 : * Estimate the amount of space required to serialize the indicated parameters.
310 : */
311 : static Size
312 30 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
313 : {
314 : int paramid;
315 30 : Size sz = sizeof(int);
316 :
317 30 : paramid = -1;
318 66 : while ((paramid = bms_next_member(params, paramid)) >= 0)
319 : {
320 : Oid typeOid;
321 : int16 typLen;
322 : bool typByVal;
323 : ParamExecData *prm;
324 :
325 36 : prm = &(estate->es_param_exec_vals[paramid]);
326 36 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
327 : paramid);
328 :
329 36 : sz = add_size(sz, sizeof(int)); /* space for paramid */
330 :
331 : /* space for datum/isnull */
332 36 : if (OidIsValid(typeOid))
333 36 : get_typlenbyval(typeOid, &typLen, &typByVal);
334 : else
335 : {
336 : /* If no type OID, assume by-value, like copyParamList does. */
337 0 : typLen = sizeof(Datum);
338 0 : typByVal = true;
339 : }
340 36 : sz = add_size(sz,
341 36 : datumEstimateSpace(prm->value, prm->isnull,
342 : typByVal, typLen));
343 : }
344 30 : return sz;
345 : }
346 :
347 : /*
348 : * Serialize specified PARAM_EXEC parameters.
349 : *
350 : * We write the number of parameters first, as a 4-byte integer, and then
351 : * write details for each parameter in turn. The details for each parameter
352 : * consist of a 4-byte paramid (location of param in execution time internal
353 : * parameter array) and then the datum as serialized by datumSerialize().
354 : */
355 : static dsa_pointer
356 30 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
357 : {
358 : Size size;
359 : int nparams;
360 : int paramid;
361 : ParamExecData *prm;
362 : dsa_pointer handle;
363 : char *start_address;
364 :
365 : /* Allocate enough space for the current parameter values. */
366 30 : size = EstimateParamExecSpace(estate, params);
367 30 : handle = dsa_allocate(area, size);
368 30 : start_address = dsa_get_address(area, handle);
369 :
370 : /* First write the number of parameters as a 4-byte integer. */
371 30 : nparams = bms_num_members(params);
372 30 : memcpy(start_address, &nparams, sizeof(int));
373 30 : start_address += sizeof(int);
374 :
375 : /* Write details for each parameter in turn. */
376 30 : paramid = -1;
377 66 : while ((paramid = bms_next_member(params, paramid)) >= 0)
378 : {
379 : Oid typeOid;
380 : int16 typLen;
381 : bool typByVal;
382 :
383 36 : prm = &(estate->es_param_exec_vals[paramid]);
384 36 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
385 : paramid);
386 :
387 : /* Write paramid. */
388 36 : memcpy(start_address, ¶mid, sizeof(int));
389 36 : start_address += sizeof(int);
390 :
391 : /* Write datum/isnull */
392 36 : if (OidIsValid(typeOid))
393 36 : get_typlenbyval(typeOid, &typLen, &typByVal);
394 : else
395 : {
396 : /* If no type OID, assume by-value, like copyParamList does. */
397 0 : typLen = sizeof(Datum);
398 0 : typByVal = true;
399 : }
400 36 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
401 : &start_address);
402 : }
403 :
404 30 : return handle;
405 : }
406 :
407 : /*
408 : * Restore specified PARAM_EXEC parameters.
409 : */
410 : static void
411 72 : RestoreParamExecParams(char *start_address, EState *estate)
412 : {
413 : int nparams;
414 : int i;
415 : int paramid;
416 :
417 72 : memcpy(&nparams, start_address, sizeof(int));
418 72 : start_address += sizeof(int);
419 :
420 156 : for (i = 0; i < nparams; i++)
421 : {
422 : ParamExecData *prm;
423 :
424 : /* Read paramid */
425 84 : memcpy(¶mid, start_address, sizeof(int));
426 84 : start_address += sizeof(int);
427 84 : prm = &(estate->es_param_exec_vals[paramid]);
428 :
429 : /* Read datum/isnull. */
430 84 : prm->value = datumRestore(&start_address, &prm->isnull);
431 84 : prm->execPlan = NULL;
432 : }
433 72 : }
434 :
435 : /*
436 : * Initialize the dynamic shared memory segment that will be used to control
437 : * parallel execution.
438 : */
439 : static bool
440 2854 : ExecParallelInitializeDSM(PlanState *planstate,
441 : ExecParallelInitializeDSMContext *d)
442 : {
443 2854 : if (planstate == NULL)
444 0 : return false;
445 :
446 : /* If instrumentation is enabled, initialize slot for this node. */
447 2854 : if (d->instrumentation != NULL)
448 1014 : d->instrumentation->plan_node_id[d->nnodes] =
449 1014 : planstate->plan->plan_node_id;
450 :
451 : /* Count this node. */
452 2854 : d->nnodes++;
453 :
454 : /*
455 : * Call initializers for DSM-using plan nodes.
456 : *
457 : * Most plan nodes won't do anything here, but plan nodes that allocated
458 : * DSM may need to initialize shared state in the DSM before parallel
459 : * workers are launched. They can allocate the space they previously
460 : * estimated using shm_toc_allocate, and add the keys they previously
461 : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
462 : */
463 2854 : switch (nodeTag(planstate))
464 : {
465 1130 : case T_SeqScanState:
466 1130 : if (planstate->plan->parallel_aware)
467 892 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
468 : d->pcxt);
469 1130 : break;
470 288 : case T_IndexScanState:
471 288 : if (planstate->plan->parallel_aware)
472 12 : ExecIndexScanInitializeDSM((IndexScanState *) planstate,
473 : d->pcxt);
474 288 : break;
475 52 : case T_IndexOnlyScanState:
476 52 : if (planstate->plan->parallel_aware)
477 40 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
478 : d->pcxt);
479 52 : break;
480 0 : case T_ForeignScanState:
481 0 : if (planstate->plan->parallel_aware)
482 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
483 : d->pcxt);
484 0 : break;
485 180 : case T_AppendState:
486 180 : if (planstate->plan->parallel_aware)
487 132 : ExecAppendInitializeDSM((AppendState *) planstate,
488 : d->pcxt);
489 180 : break;
490 0 : case T_CustomScanState:
491 0 : if (planstate->plan->parallel_aware)
492 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
493 : d->pcxt);
494 0 : break;
495 20 : case T_BitmapHeapScanState:
496 20 : if (planstate->plan->parallel_aware)
497 18 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
498 : d->pcxt);
499 20 : break;
500 192 : case T_HashJoinState:
501 192 : if (planstate->plan->parallel_aware)
502 120 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
503 : d->pcxt);
504 192 : break;
505 192 : case T_HashState:
506 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
507 192 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
508 192 : break;
509 140 : case T_SortState:
510 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
511 140 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
512 140 : break;
513 0 : case T_IncrementalSortState:
514 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
515 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
516 0 : break;
517 558 : case T_AggState:
518 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
519 558 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
520 558 : break;
521 6 : case T_MemoizeState:
522 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
523 6 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
524 6 : break;
525 96 : default:
526 96 : break;
527 : }
528 :
529 2854 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
530 : }
531 :
532 : /*
533 : * It sets up the response queues for backend workers to return tuples
534 : * to the main backend and start the workers.
535 : */
536 : static shm_mq_handle **
537 908 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
538 : {
539 : shm_mq_handle **responseq;
540 : char *tqueuespace;
541 : int i;
542 :
543 : /* Skip this if no workers. */
544 908 : if (pcxt->nworkers == 0)
545 0 : return NULL;
546 :
547 : /* Allocate memory for shared memory queue handles. */
548 : responseq = (shm_mq_handle **)
549 908 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
550 :
551 : /*
552 : * If not reinitializing, allocate space from the DSM for the queues;
553 : * otherwise, find the already allocated space.
554 : */
555 908 : if (!reinitialize)
556 : tqueuespace =
557 650 : shm_toc_allocate(pcxt->toc,
558 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
559 650 : pcxt->nworkers));
560 : else
561 258 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
562 :
563 : /* Create the queues, and become the receiver for each. */
564 3414 : for (i = 0; i < pcxt->nworkers; ++i)
565 : {
566 : shm_mq *mq;
567 :
568 2506 : mq = shm_mq_create(tqueuespace +
569 2506 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
570 : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
571 :
572 2506 : shm_mq_set_receiver(mq, MyProc);
573 2506 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
574 : }
575 :
576 : /* Add array of queues to shm_toc, so others can find it. */
577 908 : if (!reinitialize)
578 650 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
579 :
580 : /* Return array of handles. */
581 908 : return responseq;
582 : }
583 :
584 : /*
585 : * Sets up the required infrastructure for backend workers to perform
586 : * execution and return results to the main backend.
587 : */
588 : ParallelExecutorInfo *
589 650 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
590 : Bitmapset *sendParams, int nworkers,
591 : int64 tuples_needed)
592 : {
593 : ParallelExecutorInfo *pei;
594 : ParallelContext *pcxt;
595 : ExecParallelEstimateContext e;
596 : ExecParallelInitializeDSMContext d;
597 : FixedParallelExecutorState *fpes;
598 : char *pstmt_data;
599 : char *pstmt_space;
600 : char *paramlistinfo_space;
601 : BufferUsage *bufusage_space;
602 : WalUsage *walusage_space;
603 650 : SharedExecutorInstrumentation *instrumentation = NULL;
604 650 : SharedJitInstrumentation *jit_instrumentation = NULL;
605 : int pstmt_len;
606 : int paramlistinfo_len;
607 650 : int instrumentation_len = 0;
608 650 : int jit_instrumentation_len = 0;
609 650 : int instrument_offset = 0;
610 650 : Size dsa_minsize = dsa_minimum_size();
611 : char *query_string;
612 : int query_len;
613 :
614 : /*
615 : * Force any initplan outputs that we're going to pass to workers to be
616 : * evaluated, if they weren't already.
617 : *
618 : * For simplicity, we use the EState's per-output-tuple ExprContext here.
619 : * That risks intra-query memory leakage, since we might pass through here
620 : * many times before that ExprContext gets reset; but ExecSetParamPlan
621 : * doesn't normally leak any memory in the context (see its comments), so
622 : * it doesn't seem worth complicating this function's API to pass it a
623 : * shorter-lived ExprContext. This might need to change someday.
624 : */
625 650 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
626 :
627 : /* Allocate object for return value. */
628 650 : pei = palloc0(sizeof(ParallelExecutorInfo));
629 650 : pei->finished = false;
630 650 : pei->planstate = planstate;
631 :
632 : /* Fix up and serialize plan to be sent to workers. */
633 650 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
634 :
635 : /* Create a parallel context. */
636 650 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
637 650 : pei->pcxt = pcxt;
638 :
639 : /*
640 : * Before telling the parallel context to create a dynamic shared memory
641 : * segment, we need to figure out how big it should be. Estimate space
642 : * for the various things we need to store.
643 : */
644 :
645 : /* Estimate space for fixed-size state. */
646 650 : shm_toc_estimate_chunk(&pcxt->estimator,
647 : sizeof(FixedParallelExecutorState));
648 650 : shm_toc_estimate_keys(&pcxt->estimator, 1);
649 :
650 : /* Estimate space for query text. */
651 650 : query_len = strlen(estate->es_sourceText);
652 650 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
653 650 : shm_toc_estimate_keys(&pcxt->estimator, 1);
654 :
655 : /* Estimate space for serialized PlannedStmt. */
656 650 : pstmt_len = strlen(pstmt_data) + 1;
657 650 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
658 650 : shm_toc_estimate_keys(&pcxt->estimator, 1);
659 :
660 : /* Estimate space for serialized ParamListInfo. */
661 650 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
662 650 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
663 650 : shm_toc_estimate_keys(&pcxt->estimator, 1);
664 :
665 : /*
666 : * Estimate space for BufferUsage.
667 : *
668 : * If EXPLAIN is not in use and there are no extensions loaded that care,
669 : * we could skip this. But we have no way of knowing whether anyone's
670 : * looking at pgBufferUsage, so do it unconditionally.
671 : */
672 650 : shm_toc_estimate_chunk(&pcxt->estimator,
673 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
674 650 : shm_toc_estimate_keys(&pcxt->estimator, 1);
675 :
676 : /*
677 : * Same thing for WalUsage.
678 : */
679 650 : shm_toc_estimate_chunk(&pcxt->estimator,
680 : mul_size(sizeof(WalUsage), pcxt->nworkers));
681 650 : shm_toc_estimate_keys(&pcxt->estimator, 1);
682 :
683 : /* Estimate space for tuple queues. */
684 650 : shm_toc_estimate_chunk(&pcxt->estimator,
685 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
686 650 : shm_toc_estimate_keys(&pcxt->estimator, 1);
687 :
688 : /*
689 : * Give parallel-aware nodes a chance to add to the estimates, and get a
690 : * count of how many PlanState nodes there are.
691 : */
692 650 : e.pcxt = pcxt;
693 650 : e.nnodes = 0;
694 650 : ExecParallelEstimate(planstate, &e);
695 :
696 : /* Estimate space for instrumentation, if required. */
697 650 : if (estate->es_instrument)
698 : {
699 186 : instrumentation_len =
700 : offsetof(SharedExecutorInstrumentation, plan_node_id) +
701 186 : sizeof(int) * e.nnodes;
702 186 : instrumentation_len = MAXALIGN(instrumentation_len);
703 186 : instrument_offset = instrumentation_len;
704 186 : instrumentation_len +=
705 186 : mul_size(sizeof(Instrumentation),
706 186 : mul_size(e.nnodes, nworkers));
707 186 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
708 186 : shm_toc_estimate_keys(&pcxt->estimator, 1);
709 :
710 : /* Estimate space for JIT instrumentation, if required. */
711 186 : if (estate->es_jit_flags != PGJIT_NONE)
712 : {
713 24 : jit_instrumentation_len =
714 24 : offsetof(SharedJitInstrumentation, jit_instr) +
715 : sizeof(JitInstrumentation) * nworkers;
716 24 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
717 24 : shm_toc_estimate_keys(&pcxt->estimator, 1);
718 : }
719 : }
720 :
721 : /* Estimate space for DSA area. */
722 650 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
723 650 : shm_toc_estimate_keys(&pcxt->estimator, 1);
724 :
725 : /* Everyone's had a chance to ask for space, so now create the DSM. */
726 650 : InitializeParallelDSM(pcxt);
727 :
728 : /*
729 : * OK, now we have a dynamic shared memory segment, and it should be big
730 : * enough to store all of the data we estimated we would want to put into
731 : * it, plus whatever general stuff (not specifically executor-related) the
732 : * ParallelContext itself needs to store there. None of the space we
733 : * asked for has been allocated or initialized yet, though, so do that.
734 : */
735 :
736 : /* Store fixed-size state. */
737 650 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
738 650 : fpes->tuples_needed = tuples_needed;
739 650 : fpes->param_exec = InvalidDsaPointer;
740 650 : fpes->eflags = estate->es_top_eflags;
741 650 : fpes->jit_flags = estate->es_jit_flags;
742 650 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
743 :
744 : /* Store query string */
745 650 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
746 650 : memcpy(query_string, estate->es_sourceText, query_len + 1);
747 650 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
748 :
749 : /* Store serialized PlannedStmt. */
750 650 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
751 650 : memcpy(pstmt_space, pstmt_data, pstmt_len);
752 650 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
753 :
754 : /* Store serialized ParamListInfo. */
755 650 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
756 650 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
757 650 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
758 :
759 : /* Allocate space for each worker's BufferUsage; no need to initialize. */
760 650 : bufusage_space = shm_toc_allocate(pcxt->toc,
761 650 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
762 650 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
763 650 : pei->buffer_usage = bufusage_space;
764 :
765 : /* Same for WalUsage. */
766 650 : walusage_space = shm_toc_allocate(pcxt->toc,
767 650 : mul_size(sizeof(WalUsage), pcxt->nworkers));
768 650 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
769 650 : pei->wal_usage = walusage_space;
770 :
771 : /* Set up the tuple queues that the workers will write into. */
772 650 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
773 :
774 : /* We don't need the TupleQueueReaders yet, though. */
775 650 : pei->reader = NULL;
776 :
777 : /*
778 : * If instrumentation options were supplied, allocate space for the data.
779 : * It only gets partially initialized here; the rest happens during
780 : * ExecParallelInitializeDSM.
781 : */
782 650 : if (estate->es_instrument)
783 : {
784 : Instrumentation *instrument;
785 : int i;
786 :
787 186 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
788 186 : instrumentation->instrument_options = estate->es_instrument;
789 186 : instrumentation->instrument_offset = instrument_offset;
790 186 : instrumentation->num_workers = nworkers;
791 186 : instrumentation->num_plan_nodes = e.nnodes;
792 186 : instrument = GetInstrumentationArray(instrumentation);
793 1836 : for (i = 0; i < nworkers * e.nnodes; ++i)
794 1650 : InstrInit(&instrument[i], estate->es_instrument);
795 186 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
796 : instrumentation);
797 186 : pei->instrumentation = instrumentation;
798 :
799 186 : if (estate->es_jit_flags != PGJIT_NONE)
800 : {
801 24 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
802 : jit_instrumentation_len);
803 24 : jit_instrumentation->num_workers = nworkers;
804 24 : memset(jit_instrumentation->jit_instr, 0,
805 : sizeof(JitInstrumentation) * nworkers);
806 24 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
807 : jit_instrumentation);
808 24 : pei->jit_instrumentation = jit_instrumentation;
809 : }
810 : }
811 :
812 : /*
813 : * Create a DSA area that can be used by the leader and all workers.
814 : * (However, if we failed to create a DSM and are using private memory
815 : * instead, then skip this.)
816 : */
817 650 : if (pcxt->seg != NULL)
818 : {
819 : char *area_space;
820 :
821 650 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
822 650 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
823 650 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
824 : LWTRANCHE_PARALLEL_QUERY_DSA,
825 : pcxt->seg);
826 :
827 : /*
828 : * Serialize parameters, if any, using DSA storage. We don't dare use
829 : * the main parallel query DSM for this because we might relaunch
830 : * workers after the values have changed (and thus the amount of
831 : * storage required has changed).
832 : */
833 650 : if (!bms_is_empty(sendParams))
834 : {
835 30 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
836 : pei->area);
837 30 : fpes->param_exec = pei->param_exec;
838 : }
839 : }
840 :
841 : /*
842 : * Give parallel-aware nodes a chance to initialize their shared data.
843 : * This also initializes the elements of instrumentation->ps_instrument,
844 : * if it exists.
845 : */
846 650 : d.pcxt = pcxt;
847 650 : d.instrumentation = instrumentation;
848 650 : d.nnodes = 0;
849 :
850 : /* Install our DSA area while initializing the plan. */
851 650 : estate->es_query_dsa = pei->area;
852 650 : ExecParallelInitializeDSM(planstate, &d);
853 650 : estate->es_query_dsa = NULL;
854 :
855 : /*
856 : * Make sure that the world hasn't shifted under our feet. This could
857 : * probably just be an Assert(), but let's be conservative for now.
858 : */
859 650 : if (e.nnodes != d.nnodes)
860 0 : elog(ERROR, "inconsistent count of PlanState nodes");
861 :
862 : /* OK, we're ready to rock and roll. */
863 650 : return pei;
864 : }
865 :
866 : /*
867 : * Set up tuple queue readers to read the results of a parallel subplan.
868 : *
869 : * This is separate from ExecInitParallelPlan() because we can launch the
870 : * worker processes and let them start doing something before we do this.
871 : */
872 : void
873 890 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
874 : {
875 890 : int nworkers = pei->pcxt->nworkers_launched;
876 : int i;
877 :
878 : Assert(pei->reader == NULL);
879 :
880 890 : if (nworkers > 0)
881 : {
882 890 : pei->reader = (TupleQueueReader **)
883 890 : palloc(nworkers * sizeof(TupleQueueReader *));
884 :
885 3322 : for (i = 0; i < nworkers; i++)
886 : {
887 2432 : shm_mq_set_handle(pei->tqueue[i],
888 2432 : pei->pcxt->worker[i].bgwhandle);
889 2432 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
890 : }
891 : }
892 890 : }
893 :
894 : /*
895 : * Re-initialize the parallel executor shared memory state before launching
896 : * a fresh batch of workers.
897 : */
898 : void
899 258 : ExecParallelReinitialize(PlanState *planstate,
900 : ParallelExecutorInfo *pei,
901 : Bitmapset *sendParams)
902 : {
903 258 : EState *estate = planstate->state;
904 : FixedParallelExecutorState *fpes;
905 :
906 : /* Old workers must already be shut down */
907 : Assert(pei->finished);
908 :
909 : /*
910 : * Force any initplan outputs that we're going to pass to workers to be
911 : * evaluated, if they weren't already (see comments in
912 : * ExecInitParallelPlan).
913 : */
914 258 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
915 :
916 258 : ReinitializeParallelDSM(pei->pcxt);
917 258 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
918 258 : pei->reader = NULL;
919 258 : pei->finished = false;
920 :
921 258 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
922 :
923 : /* Free any serialized parameters from the last round. */
924 258 : if (DsaPointerIsValid(fpes->param_exec))
925 : {
926 0 : dsa_free(pei->area, fpes->param_exec);
927 0 : fpes->param_exec = InvalidDsaPointer;
928 : }
929 :
930 : /* Serialize current parameter values if required. */
931 258 : if (!bms_is_empty(sendParams))
932 : {
933 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
934 : pei->area);
935 0 : fpes->param_exec = pei->param_exec;
936 : }
937 :
938 : /* Traverse plan tree and let each child node reset associated state. */
939 258 : estate->es_query_dsa = pei->area;
940 258 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
941 258 : estate->es_query_dsa = NULL;
942 258 : }
943 :
944 : /*
945 : * Traverse plan tree to reinitialize per-node dynamic shared memory state
946 : */
947 : static bool
948 666 : ExecParallelReInitializeDSM(PlanState *planstate,
949 : ParallelContext *pcxt)
950 : {
951 666 : if (planstate == NULL)
952 0 : return false;
953 :
954 : /*
955 : * Call reinitializers for DSM-using plan nodes.
956 : */
957 666 : switch (nodeTag(planstate))
958 : {
959 276 : case T_SeqScanState:
960 276 : if (planstate->plan->parallel_aware)
961 228 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
962 : pcxt);
963 276 : break;
964 12 : case T_IndexScanState:
965 12 : if (planstate->plan->parallel_aware)
966 12 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
967 : pcxt);
968 12 : break;
969 12 : case T_IndexOnlyScanState:
970 12 : if (planstate->plan->parallel_aware)
971 12 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
972 : pcxt);
973 12 : break;
974 0 : case T_ForeignScanState:
975 0 : if (planstate->plan->parallel_aware)
976 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
977 : pcxt);
978 0 : break;
979 0 : case T_AppendState:
980 0 : if (planstate->plan->parallel_aware)
981 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
982 0 : break;
983 0 : case T_CustomScanState:
984 0 : if (planstate->plan->parallel_aware)
985 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
986 : pcxt);
987 0 : break;
988 54 : case T_BitmapHeapScanState:
989 54 : if (planstate->plan->parallel_aware)
990 54 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
991 : pcxt);
992 54 : break;
993 96 : case T_HashJoinState:
994 96 : if (planstate->plan->parallel_aware)
995 48 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
996 : pcxt);
997 96 : break;
998 126 : case T_HashState:
999 : case T_SortState:
1000 : case T_IncrementalSortState:
1001 : case T_MemoizeState:
1002 : /* these nodes have DSM state, but no reinitialization is required */
1003 126 : break;
1004 :
1005 90 : default:
1006 90 : break;
1007 : }
1008 :
1009 666 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1010 : }
1011 :
1012 : /*
1013 : * Copy instrumentation information about this node and its descendants from
1014 : * dynamic shared memory.
1015 : */
1016 : static bool
1017 1014 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1018 : SharedExecutorInstrumentation *instrumentation)
1019 : {
1020 : Instrumentation *instrument;
1021 : int i;
1022 : int n;
1023 : int ibytes;
1024 1014 : int plan_node_id = planstate->plan->plan_node_id;
1025 : MemoryContext oldcontext;
1026 :
1027 : /* Find the instrumentation for this node. */
1028 4470 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1029 4470 : if (instrumentation->plan_node_id[i] == plan_node_id)
1030 1014 : break;
1031 1014 : if (i >= instrumentation->num_plan_nodes)
1032 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1033 :
1034 : /* Accumulate the statistics from all workers. */
1035 1014 : instrument = GetInstrumentationArray(instrumentation);
1036 1014 : instrument += i * instrumentation->num_workers;
1037 2664 : for (n = 0; n < instrumentation->num_workers; ++n)
1038 1650 : InstrAggNode(planstate->instrument, &instrument[n]);
1039 :
1040 : /*
1041 : * Also store the per-worker detail.
1042 : *
1043 : * Worker instrumentation should be allocated in the same context as the
1044 : * regular instrumentation information, which is the per-query context.
1045 : * Switch into per-query memory context.
1046 : */
1047 1014 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1048 1014 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1049 1014 : planstate->worker_instrument =
1050 1014 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1051 1014 : MemoryContextSwitchTo(oldcontext);
1052 :
1053 1014 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
1054 1014 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1055 :
1056 : /* Perform any node-type-specific work that needs to be done. */
1057 1014 : switch (nodeTag(planstate))
1058 : {
1059 12 : case T_SortState:
1060 12 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1061 12 : break;
1062 0 : case T_IncrementalSortState:
1063 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1064 0 : break;
1065 84 : case T_HashState:
1066 84 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1067 84 : break;
1068 108 : case T_AggState:
1069 108 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1070 108 : break;
1071 0 : case T_MemoizeState:
1072 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1073 0 : break;
1074 810 : default:
1075 810 : break;
1076 : }
1077 :
1078 1014 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1079 : instrumentation);
1080 : }
1081 :
1082 : /*
1083 : * Add up the workers' JIT instrumentation from dynamic shared memory.
1084 : */
1085 : static void
1086 24 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1087 : SharedJitInstrumentation *shared_jit)
1088 : {
1089 : JitInstrumentation *combined;
1090 : int ibytes;
1091 :
1092 : int n;
1093 :
1094 : /*
1095 : * Accumulate worker JIT instrumentation into the combined JIT
1096 : * instrumentation, allocating it if required.
1097 : */
1098 24 : if (!planstate->state->es_jit_worker_instr)
1099 24 : planstate->state->es_jit_worker_instr =
1100 24 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1101 24 : combined = planstate->state->es_jit_worker_instr;
1102 :
1103 : /* Accumulate all the workers' instrumentations. */
1104 72 : for (n = 0; n < shared_jit->num_workers; ++n)
1105 48 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1106 :
1107 : /*
1108 : * Store the per-worker detail.
1109 : *
1110 : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1111 : * instrumentation in per-query context.
1112 : */
1113 24 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1114 24 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1115 24 : planstate->worker_jit_instrument =
1116 24 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1117 :
1118 24 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1119 24 : }
1120 :
1121 : /*
1122 : * Finish parallel execution. We wait for parallel workers to finish, and
1123 : * accumulate their buffer/WAL usage.
1124 : */
1125 : void
1126 1654 : ExecParallelFinish(ParallelExecutorInfo *pei)
1127 : {
1128 1654 : int nworkers = pei->pcxt->nworkers_launched;
1129 : int i;
1130 :
1131 : /* Make this be a no-op if called twice in a row. */
1132 1654 : if (pei->finished)
1133 752 : return;
1134 :
1135 : /*
1136 : * Detach from tuple queues ASAP, so that any still-active workers will
1137 : * notice that no further results are wanted.
1138 : */
1139 902 : if (pei->tqueue != NULL)
1140 : {
1141 3328 : for (i = 0; i < nworkers; i++)
1142 2426 : shm_mq_detach(pei->tqueue[i]);
1143 902 : pfree(pei->tqueue);
1144 902 : pei->tqueue = NULL;
1145 : }
1146 :
1147 : /*
1148 : * While we're waiting for the workers to finish, let's get rid of the
1149 : * tuple queue readers. (Any other local cleanup could be done here too.)
1150 : */
1151 902 : if (pei->reader != NULL)
1152 : {
1153 3310 : for (i = 0; i < nworkers; i++)
1154 2426 : DestroyTupleQueueReader(pei->reader[i]);
1155 884 : pfree(pei->reader);
1156 884 : pei->reader = NULL;
1157 : }
1158 :
1159 : /* Now wait for the workers to finish. */
1160 902 : WaitForParallelWorkersToFinish(pei->pcxt);
1161 :
1162 : /*
1163 : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1164 : * finish, or we might get incomplete data.)
1165 : */
1166 3328 : for (i = 0; i < nworkers; i++)
1167 2426 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1168 :
1169 902 : pei->finished = true;
1170 : }
1171 :
1172 : /*
1173 : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1174 : * resources still exist after ExecParallelFinish. We separate these
1175 : * routines because someone might want to examine the contents of the DSM
1176 : * after ExecParallelFinish and before calling this routine.
1177 : */
1178 : void
1179 644 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1180 : {
1181 : /* Accumulate instrumentation, if any. */
1182 644 : if (pei->instrumentation)
1183 186 : ExecParallelRetrieveInstrumentation(pei->planstate,
1184 : pei->instrumentation);
1185 :
1186 : /* Accumulate JIT instrumentation, if any. */
1187 644 : if (pei->jit_instrumentation)
1188 24 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1189 24 : pei->jit_instrumentation);
1190 :
1191 : /* Free any serialized parameters. */
1192 644 : if (DsaPointerIsValid(pei->param_exec))
1193 : {
1194 30 : dsa_free(pei->area, pei->param_exec);
1195 30 : pei->param_exec = InvalidDsaPointer;
1196 : }
1197 644 : if (pei->area != NULL)
1198 : {
1199 644 : dsa_detach(pei->area);
1200 644 : pei->area = NULL;
1201 : }
1202 644 : if (pei->pcxt != NULL)
1203 : {
1204 644 : DestroyParallelContext(pei->pcxt);
1205 644 : pei->pcxt = NULL;
1206 : }
1207 644 : pfree(pei);
1208 644 : }
1209 :
1210 : /*
1211 : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1212 : * for that purpose.
1213 : */
1214 : static DestReceiver *
1215 2432 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1216 : {
1217 : char *mqspace;
1218 : shm_mq *mq;
1219 :
1220 2432 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1221 2432 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1222 2432 : mq = (shm_mq *) mqspace;
1223 2432 : shm_mq_set_sender(mq, MyProc);
1224 2432 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1225 : }
1226 :
1227 : /*
1228 : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1229 : */
1230 : static QueryDesc *
1231 2432 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1232 : int instrument_options)
1233 : {
1234 : char *pstmtspace;
1235 : char *paramspace;
1236 : PlannedStmt *pstmt;
1237 : ParamListInfo paramLI;
1238 : char *queryString;
1239 :
1240 : /* Get the query string from shared memory */
1241 2432 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1242 :
1243 : /* Reconstruct leader-supplied PlannedStmt. */
1244 2432 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1245 2432 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1246 :
1247 : /* Reconstruct ParamListInfo. */
1248 2432 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1249 2432 : paramLI = RestoreParamList(¶mspace);
1250 :
1251 : /* Create a QueryDesc for the query. */
1252 2432 : return CreateQueryDesc(pstmt,
1253 : queryString,
1254 : GetActiveSnapshot(), InvalidSnapshot,
1255 : receiver, paramLI, NULL, instrument_options);
1256 : }
1257 :
1258 : /*
1259 : * Copy instrumentation information from this node and its descendants into
1260 : * dynamic shared memory, so that the parallel leader can retrieve it.
1261 : */
1262 : static bool
1263 2346 : ExecParallelReportInstrumentation(PlanState *planstate,
1264 : SharedExecutorInstrumentation *instrumentation)
1265 : {
1266 : int i;
1267 2346 : int plan_node_id = planstate->plan->plan_node_id;
1268 : Instrumentation *instrument;
1269 :
1270 2346 : InstrEndLoop(planstate->instrument);
1271 :
1272 : /*
1273 : * If we shuffled the plan_node_id values in ps_instrument into sorted
1274 : * order, we could use binary search here. This might matter someday if
1275 : * we're pushing down sufficiently large plan trees. For now, do it the
1276 : * slow, dumb way.
1277 : */
1278 7446 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1279 7446 : if (instrumentation->plan_node_id[i] == plan_node_id)
1280 2346 : break;
1281 2346 : if (i >= instrumentation->num_plan_nodes)
1282 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1283 :
1284 : /*
1285 : * Add our statistics to the per-node, per-worker totals. It's possible
1286 : * that this could happen more than once if we relaunched workers.
1287 : */
1288 2346 : instrument = GetInstrumentationArray(instrumentation);
1289 2346 : instrument += i * instrumentation->num_workers;
1290 : Assert(IsParallelWorker());
1291 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1292 2346 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1293 :
1294 2346 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1295 : instrumentation);
1296 : }
1297 :
1298 : /*
1299 : * Initialize the PlanState and its descendants with the information
1300 : * retrieved from shared memory. This has to be done once the PlanState
1301 : * is allocated and initialized by executor; that is, after ExecutorStart().
1302 : */
1303 : static bool
1304 7984 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1305 : {
1306 7984 : if (planstate == NULL)
1307 0 : return false;
1308 :
1309 7984 : switch (nodeTag(planstate))
1310 : {
1311 3282 : case T_SeqScanState:
1312 3282 : if (planstate->plan->parallel_aware)
1313 2654 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1314 3282 : break;
1315 372 : case T_IndexScanState:
1316 372 : if (planstate->plan->parallel_aware)
1317 96 : ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1318 : pwcxt);
1319 372 : break;
1320 236 : case T_IndexOnlyScanState:
1321 236 : if (planstate->plan->parallel_aware)
1322 200 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1323 : pwcxt);
1324 236 : break;
1325 0 : case T_ForeignScanState:
1326 0 : if (planstate->plan->parallel_aware)
1327 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1328 : pwcxt);
1329 0 : break;
1330 366 : case T_AppendState:
1331 366 : if (planstate->plan->parallel_aware)
1332 306 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1333 366 : break;
1334 0 : case T_CustomScanState:
1335 0 : if (planstate->plan->parallel_aware)
1336 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1337 : pwcxt);
1338 0 : break;
1339 278 : case T_BitmapHeapScanState:
1340 278 : if (planstate->plan->parallel_aware)
1341 276 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1342 : pwcxt);
1343 278 : break;
1344 548 : case T_HashJoinState:
1345 548 : if (planstate->plan->parallel_aware)
1346 308 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1347 : pwcxt);
1348 548 : break;
1349 548 : case T_HashState:
1350 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1351 548 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1352 548 : break;
1353 428 : case T_SortState:
1354 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1355 428 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1356 428 : break;
1357 0 : case T_IncrementalSortState:
1358 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1359 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1360 : pwcxt);
1361 0 : break;
1362 1532 : case T_AggState:
1363 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1364 1532 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1365 1532 : break;
1366 12 : case T_MemoizeState:
1367 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1368 12 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1369 12 : break;
1370 382 : default:
1371 382 : break;
1372 : }
1373 :
1374 7984 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1375 : pwcxt);
1376 : }
1377 :
1378 : /*
1379 : * Main entrypoint for parallel query worker processes.
1380 : *
1381 : * We reach this function from ParallelWorkerMain, so the setup necessary to
1382 : * create a sensible parallel environment has already been done;
1383 : * ParallelWorkerMain worries about stuff like the transaction state, combo
1384 : * CID mappings, and GUC values, so we don't need to deal with any of that
1385 : * here.
1386 : *
1387 : * Our job is to deal with concerns specific to the executor. The parallel
1388 : * group leader will have stored a serialized PlannedStmt, and it's our job
1389 : * to execute that plan and write the resulting tuples to the appropriate
1390 : * tuple queue. Various bits of supporting information that we need in order
1391 : * to do this are also stored in the dsm_segment and can be accessed through
1392 : * the shm_toc.
1393 : */
1394 : void
1395 2432 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1396 : {
1397 : FixedParallelExecutorState *fpes;
1398 : BufferUsage *buffer_usage;
1399 : WalUsage *wal_usage;
1400 : DestReceiver *receiver;
1401 : QueryDesc *queryDesc;
1402 : SharedExecutorInstrumentation *instrumentation;
1403 : SharedJitInstrumentation *jit_instrumentation;
1404 2432 : int instrument_options = 0;
1405 : void *area_space;
1406 : dsa_area *area;
1407 : ParallelWorkerContext pwcxt;
1408 :
1409 : /* Get fixed-size state. */
1410 2432 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1411 :
1412 : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1413 2432 : receiver = ExecParallelGetReceiver(seg, toc);
1414 2432 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1415 2432 : if (instrumentation != NULL)
1416 738 : instrument_options = instrumentation->instrument_options;
1417 2432 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1418 : true);
1419 2432 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1420 :
1421 : /* Setting debug_query_string for individual workers */
1422 2432 : debug_query_string = queryDesc->sourceText;
1423 :
1424 : /* Report workers' query for monitoring purposes */
1425 2432 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1426 :
1427 : /* Attach to the dynamic shared memory area. */
1428 2432 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1429 2432 : area = dsa_attach_in_place(area_space, seg);
1430 :
1431 : /* Start up the executor */
1432 2432 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1433 2432 : ExecutorStart(queryDesc, fpes->eflags);
1434 :
1435 : /* Special executor initialization steps for parallel workers */
1436 2432 : queryDesc->planstate->state->es_query_dsa = area;
1437 2432 : if (DsaPointerIsValid(fpes->param_exec))
1438 : {
1439 : char *paramexec_space;
1440 :
1441 72 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1442 72 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1443 : }
1444 2432 : pwcxt.toc = toc;
1445 2432 : pwcxt.seg = seg;
1446 2432 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1447 :
1448 : /* Pass down any tuple bound */
1449 2432 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1450 :
1451 : /*
1452 : * Prepare to track buffer/WAL usage during query execution.
1453 : *
1454 : * We do this after starting up the executor to match what happens in the
1455 : * leader, which also doesn't count buffer accesses and WAL activity that
1456 : * occur during executor startup.
1457 : */
1458 2432 : InstrStartParallelQuery();
1459 :
1460 : /*
1461 : * Run the plan. If we specified a tuple bound, be careful not to demand
1462 : * more tuples than that.
1463 : */
1464 2432 : ExecutorRun(queryDesc,
1465 : ForwardScanDirection,
1466 2432 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1467 : true);
1468 :
1469 : /* Shut down the executor */
1470 2426 : ExecutorFinish(queryDesc);
1471 :
1472 : /* Report buffer/WAL usage during parallel execution. */
1473 2426 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1474 2426 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1475 2426 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1476 2426 : &wal_usage[ParallelWorkerNumber]);
1477 :
1478 : /* Report instrumentation data if any instrumentation options are set. */
1479 2426 : if (instrumentation != NULL)
1480 738 : ExecParallelReportInstrumentation(queryDesc->planstate,
1481 : instrumentation);
1482 :
1483 : /* Report JIT instrumentation data if any */
1484 2426 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1485 : {
1486 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1487 144 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1488 144 : queryDesc->estate->es_jit->instr;
1489 : }
1490 :
1491 : /* Must do this after capturing instrumentation. */
1492 2426 : ExecutorEnd(queryDesc);
1493 :
1494 : /* Cleanup. */
1495 2426 : dsa_detach(area);
1496 2426 : FreeQueryDesc(queryDesc);
1497 2426 : receiver->rDestroy(receiver);
1498 2426 : }
|