Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execParallel.c
4 : * Support routines for parallel execution.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * This file contains routines that are intended to support setting up,
10 : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : * executor. The ParallelContext machinery will handle starting the
12 : * workers and ensuring that their state generally matches that of the
13 : * leader; see src/backend/access/transam/README.parallel for details.
14 : * However, we must save and restore relevant executor state, such as
15 : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : * the actual plan to be passed down to the worker.
17 : *
18 : * IDENTIFICATION
19 : * src/backend/executor/execParallel.c
20 : *
21 : *-------------------------------------------------------------------------
22 : */
23 :
24 : #include "postgres.h"
25 :
26 : #include "executor/execParallel.h"
27 : #include "executor/executor.h"
28 : #include "executor/nodeAgg.h"
29 : #include "executor/nodeAppend.h"
30 : #include "executor/nodeBitmapHeapscan.h"
31 : #include "executor/nodeBitmapIndexscan.h"
32 : #include "executor/nodeCustom.h"
33 : #include "executor/nodeForeignscan.h"
34 : #include "executor/nodeHash.h"
35 : #include "executor/nodeHashjoin.h"
36 : #include "executor/nodeIncrementalSort.h"
37 : #include "executor/nodeIndexonlyscan.h"
38 : #include "executor/nodeIndexscan.h"
39 : #include "executor/nodeMemoize.h"
40 : #include "executor/nodeSeqscan.h"
41 : #include "executor/nodeSort.h"
42 : #include "executor/nodeSubplan.h"
43 : #include "executor/tqueue.h"
44 : #include "jit/jit.h"
45 : #include "nodes/nodeFuncs.h"
46 : #include "pgstat.h"
47 : #include "tcop/tcopprot.h"
48 : #include "utils/datum.h"
49 : #include "utils/dsa.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/snapmgr.h"
52 :
53 : /*
54 : * Magic numbers for parallel executor communication. We use constants
55 : * greater than any 32-bit integer here so that values < 2^32 can be used
56 : * by individual parallel nodes to store their own state.
57 : */
58 : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
59 : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
60 : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
61 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
62 : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
63 : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
64 : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
65 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
66 : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
67 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
68 :
69 : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
70 :
71 : /*
72 : * Fixed-size random stuff that we need to pass to parallel workers.
73 : */
74 : typedef struct FixedParallelExecutorState
75 : {
76 : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
77 : dsa_pointer param_exec;
78 : int eflags;
79 : int jit_flags;
80 : } FixedParallelExecutorState;
81 :
82 : /*
83 : * DSM structure for accumulating per-PlanState instrumentation.
84 : *
85 : * instrument_options: Same meaning here as in instrument.c.
86 : *
87 : * instrument_offset: Offset, relative to the start of this structure,
88 : * of the first Instrumentation object. This will depend on the length of
89 : * the plan_node_id array.
90 : *
91 : * num_workers: Number of workers.
92 : *
93 : * num_plan_nodes: Number of plan nodes.
94 : *
95 : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
96 : * from parallel workers. The length of this array is given by num_plan_nodes.
97 : */
98 : struct SharedExecutorInstrumentation
99 : {
100 : int instrument_options;
101 : int instrument_offset;
102 : int num_workers;
103 : int num_plan_nodes;
104 : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
105 : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
106 : };
107 : #define GetInstrumentationArray(sei) \
108 : (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
109 : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
110 :
111 : /* Context object for ExecParallelEstimate. */
112 : typedef struct ExecParallelEstimateContext
113 : {
114 : ParallelContext *pcxt;
115 : int nnodes;
116 : } ExecParallelEstimateContext;
117 :
118 : /* Context object for ExecParallelInitializeDSM. */
119 : typedef struct ExecParallelInitializeDSMContext
120 : {
121 : ParallelContext *pcxt;
122 : SharedExecutorInstrumentation *instrumentation;
123 : int nnodes;
124 : } ExecParallelInitializeDSMContext;
125 :
126 : /* Helper functions that run in the parallel leader. */
127 : static char *ExecSerializePlan(Plan *plan, EState *estate);
128 : static bool ExecParallelEstimate(PlanState *planstate,
129 : ExecParallelEstimateContext *e);
130 : static bool ExecParallelInitializeDSM(PlanState *planstate,
131 : ExecParallelInitializeDSMContext *d);
132 : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
133 : bool reinitialize);
134 : static bool ExecParallelReInitializeDSM(PlanState *planstate,
135 : ParallelContext *pcxt);
136 : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
137 : SharedExecutorInstrumentation *instrumentation);
138 :
139 : /* Helper function that runs in the parallel worker. */
140 : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
141 :
142 : /*
143 : * Create a serialized representation of the plan to be sent to each worker.
144 : */
145 : static char *
146 718 : ExecSerializePlan(Plan *plan, EState *estate)
147 : {
148 : PlannedStmt *pstmt;
149 : ListCell *lc;
150 :
151 : /* We can't scribble on the original plan, so make a copy. */
152 718 : plan = copyObject(plan);
153 :
154 : /*
155 : * The worker will start its own copy of the executor, and that copy will
156 : * insert a junk filter if the toplevel node has any resjunk entries. We
157 : * don't want that to happen, because while resjunk columns shouldn't be
158 : * sent back to the user, here the tuples are coming back to another
159 : * backend which may very well need them. So mutate the target list
160 : * accordingly. This is sort of a hack; there might be better ways to do
161 : * this...
162 : */
163 1978 : foreach(lc, plan->targetlist)
164 : {
165 1260 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
166 :
167 1260 : tle->resjunk = false;
168 : }
169 :
170 : /*
171 : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
172 : * for our purposes, but the worker will need at least a minimal
173 : * PlannedStmt to start the executor.
174 : */
175 718 : pstmt = makeNode(PlannedStmt);
176 718 : pstmt->commandType = CMD_SELECT;
177 718 : pstmt->queryId = pgstat_get_my_query_id();
178 718 : pstmt->planId = pgstat_get_my_plan_id();
179 718 : pstmt->hasReturning = false;
180 718 : pstmt->hasModifyingCTE = false;
181 718 : pstmt->canSetTag = true;
182 718 : pstmt->transientPlan = false;
183 718 : pstmt->dependsOnRole = false;
184 718 : pstmt->parallelModeNeeded = false;
185 718 : pstmt->planTree = plan;
186 718 : pstmt->partPruneInfos = estate->es_part_prune_infos;
187 718 : pstmt->rtable = estate->es_range_table;
188 718 : pstmt->unprunableRelids = estate->es_unpruned_relids;
189 718 : pstmt->permInfos = estate->es_rteperminfos;
190 718 : pstmt->resultRelations = NIL;
191 718 : pstmt->appendRelations = NIL;
192 :
193 : /*
194 : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
195 : * for unsafe ones (so that the list indexes of the safe ones are
196 : * preserved). This positively ensures that the worker won't try to run,
197 : * or even do ExecInitNode on, an unsafe subplan. That's important to
198 : * protect, eg, non-parallel-aware FDWs from getting into trouble.
199 : */
200 718 : pstmt->subplans = NIL;
201 772 : foreach(lc, estate->es_plannedstmt->subplans)
202 : {
203 54 : Plan *subplan = (Plan *) lfirst(lc);
204 :
205 54 : if (subplan && !subplan->parallel_safe)
206 12 : subplan = NULL;
207 54 : pstmt->subplans = lappend(pstmt->subplans, subplan);
208 : }
209 :
210 718 : pstmt->rewindPlanIDs = NULL;
211 718 : pstmt->rowMarks = NIL;
212 718 : pstmt->relationOids = NIL;
213 718 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
214 718 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
215 718 : pstmt->utilityStmt = NULL;
216 718 : pstmt->stmt_location = -1;
217 718 : pstmt->stmt_len = -1;
218 :
219 : /* Return serialized copy of our dummy PlannedStmt. */
220 718 : return nodeToString(pstmt);
221 : }
222 :
223 : /*
224 : * Parallel-aware plan nodes (and occasionally others) may need some state
225 : * which is shared across all parallel workers. Before we size the DSM, give
226 : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
227 : * &pcxt->estimator.
228 : *
229 : * While we're at it, count the number of PlanState nodes in the tree, so
230 : * we know how many Instrumentation structures we need.
231 : */
232 : static bool
233 2960 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
234 : {
235 2960 : if (planstate == NULL)
236 0 : return false;
237 :
238 : /* Count this node. */
239 2960 : e->nnodes++;
240 :
241 2960 : switch (nodeTag(planstate))
242 : {
243 1144 : case T_SeqScanState:
244 1144 : if (planstate->plan->parallel_aware)
245 906 : ExecSeqScanEstimate((SeqScanState *) planstate,
246 : e->pcxt);
247 1144 : break;
248 294 : case T_IndexScanState:
249 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
250 294 : ExecIndexScanEstimate((IndexScanState *) planstate,
251 : e->pcxt);
252 294 : break;
253 58 : case T_IndexOnlyScanState:
254 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
255 58 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
256 : e->pcxt);
257 58 : break;
258 20 : case T_BitmapIndexScanState:
259 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
260 20 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
261 : e->pcxt);
262 20 : break;
263 0 : case T_ForeignScanState:
264 0 : if (planstate->plan->parallel_aware)
265 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
266 : e->pcxt);
267 0 : break;
268 186 : case T_AppendState:
269 186 : if (planstate->plan->parallel_aware)
270 138 : ExecAppendEstimate((AppendState *) planstate,
271 : e->pcxt);
272 186 : break;
273 0 : case T_CustomScanState:
274 0 : if (planstate->plan->parallel_aware)
275 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
276 : e->pcxt);
277 0 : break;
278 20 : case T_BitmapHeapScanState:
279 20 : if (planstate->plan->parallel_aware)
280 18 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
281 : e->pcxt);
282 20 : break;
283 192 : case T_HashJoinState:
284 192 : if (planstate->plan->parallel_aware)
285 120 : ExecHashJoinEstimate((HashJoinState *) planstate,
286 : e->pcxt);
287 192 : break;
288 192 : case T_HashState:
289 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
290 192 : ExecHashEstimate((HashState *) planstate, e->pcxt);
291 192 : break;
292 152 : case T_SortState:
293 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
294 152 : ExecSortEstimate((SortState *) planstate, e->pcxt);
295 152 : break;
296 0 : case T_IncrementalSortState:
297 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
298 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
299 0 : break;
300 554 : case T_AggState:
301 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
302 554 : ExecAggEstimate((AggState *) planstate, e->pcxt);
303 554 : break;
304 6 : case T_MemoizeState:
305 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
306 6 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
307 6 : break;
308 142 : default:
309 142 : break;
310 : }
311 :
312 2960 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
313 : }
314 :
315 : /*
316 : * Estimate the amount of space required to serialize the indicated parameters.
317 : */
318 : static Size
319 24 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
320 : {
321 : int paramid;
322 24 : Size sz = sizeof(int);
323 :
324 24 : paramid = -1;
325 54 : while ((paramid = bms_next_member(params, paramid)) >= 0)
326 : {
327 : Oid typeOid;
328 : int16 typLen;
329 : bool typByVal;
330 : ParamExecData *prm;
331 :
332 30 : prm = &(estate->es_param_exec_vals[paramid]);
333 30 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
334 : paramid);
335 :
336 30 : sz = add_size(sz, sizeof(int)); /* space for paramid */
337 :
338 : /* space for datum/isnull */
339 30 : if (OidIsValid(typeOid))
340 30 : get_typlenbyval(typeOid, &typLen, &typByVal);
341 : else
342 : {
343 : /* If no type OID, assume by-value, like copyParamList does. */
344 0 : typLen = sizeof(Datum);
345 0 : typByVal = true;
346 : }
347 30 : sz = add_size(sz,
348 30 : datumEstimateSpace(prm->value, prm->isnull,
349 : typByVal, typLen));
350 : }
351 24 : return sz;
352 : }
353 :
354 : /*
355 : * Serialize specified PARAM_EXEC parameters.
356 : *
357 : * We write the number of parameters first, as a 4-byte integer, and then
358 : * write details for each parameter in turn. The details for each parameter
359 : * consist of a 4-byte paramid (location of param in execution time internal
360 : * parameter array) and then the datum as serialized by datumSerialize().
361 : */
362 : static dsa_pointer
363 24 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
364 : {
365 : Size size;
366 : int nparams;
367 : int paramid;
368 : ParamExecData *prm;
369 : dsa_pointer handle;
370 : char *start_address;
371 :
372 : /* Allocate enough space for the current parameter values. */
373 24 : size = EstimateParamExecSpace(estate, params);
374 24 : handle = dsa_allocate(area, size);
375 24 : start_address = dsa_get_address(area, handle);
376 :
377 : /* First write the number of parameters as a 4-byte integer. */
378 24 : nparams = bms_num_members(params);
379 24 : memcpy(start_address, &nparams, sizeof(int));
380 24 : start_address += sizeof(int);
381 :
382 : /* Write details for each parameter in turn. */
383 24 : paramid = -1;
384 54 : while ((paramid = bms_next_member(params, paramid)) >= 0)
385 : {
386 : Oid typeOid;
387 : int16 typLen;
388 : bool typByVal;
389 :
390 30 : prm = &(estate->es_param_exec_vals[paramid]);
391 30 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
392 : paramid);
393 :
394 : /* Write paramid. */
395 30 : memcpy(start_address, ¶mid, sizeof(int));
396 30 : start_address += sizeof(int);
397 :
398 : /* Write datum/isnull */
399 30 : if (OidIsValid(typeOid))
400 30 : get_typlenbyval(typeOid, &typLen, &typByVal);
401 : else
402 : {
403 : /* If no type OID, assume by-value, like copyParamList does. */
404 0 : typLen = sizeof(Datum);
405 0 : typByVal = true;
406 : }
407 30 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
408 : &start_address);
409 : }
410 :
411 24 : return handle;
412 : }
413 :
414 : /*
415 : * Restore specified PARAM_EXEC parameters.
416 : */
417 : static void
418 72 : RestoreParamExecParams(char *start_address, EState *estate)
419 : {
420 : int nparams;
421 : int i;
422 : int paramid;
423 :
424 72 : memcpy(&nparams, start_address, sizeof(int));
425 72 : start_address += sizeof(int);
426 :
427 156 : for (i = 0; i < nparams; i++)
428 : {
429 : ParamExecData *prm;
430 :
431 : /* Read paramid */
432 84 : memcpy(¶mid, start_address, sizeof(int));
433 84 : start_address += sizeof(int);
434 84 : prm = &(estate->es_param_exec_vals[paramid]);
435 :
436 : /* Read datum/isnull. */
437 84 : prm->value = datumRestore(&start_address, &prm->isnull);
438 84 : prm->execPlan = NULL;
439 : }
440 72 : }
441 :
442 : /*
443 : * Initialize the dynamic shared memory segment that will be used to control
444 : * parallel execution.
445 : */
446 : static bool
447 2960 : ExecParallelInitializeDSM(PlanState *planstate,
448 : ExecParallelInitializeDSMContext *d)
449 : {
450 2960 : if (planstate == NULL)
451 0 : return false;
452 :
453 : /* If instrumentation is enabled, initialize slot for this node. */
454 2960 : if (d->instrumentation != NULL)
455 1026 : d->instrumentation->plan_node_id[d->nnodes] =
456 1026 : planstate->plan->plan_node_id;
457 :
458 : /* Count this node. */
459 2960 : d->nnodes++;
460 :
461 : /*
462 : * Call initializers for DSM-using plan nodes.
463 : *
464 : * Most plan nodes won't do anything here, but plan nodes that allocated
465 : * DSM may need to initialize shared state in the DSM before parallel
466 : * workers are launched. They can allocate the space they previously
467 : * estimated using shm_toc_allocate, and add the keys they previously
468 : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
469 : */
470 2960 : switch (nodeTag(planstate))
471 : {
472 1144 : case T_SeqScanState:
473 1144 : if (planstate->plan->parallel_aware)
474 906 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
475 : d->pcxt);
476 1144 : break;
477 294 : case T_IndexScanState:
478 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
479 294 : ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
480 294 : break;
481 58 : case T_IndexOnlyScanState:
482 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
483 58 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
484 : d->pcxt);
485 58 : break;
486 20 : case T_BitmapIndexScanState:
487 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
488 20 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
489 20 : break;
490 0 : case T_ForeignScanState:
491 0 : if (planstate->plan->parallel_aware)
492 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
493 : d->pcxt);
494 0 : break;
495 186 : case T_AppendState:
496 186 : if (planstate->plan->parallel_aware)
497 138 : ExecAppendInitializeDSM((AppendState *) planstate,
498 : d->pcxt);
499 186 : break;
500 0 : case T_CustomScanState:
501 0 : if (planstate->plan->parallel_aware)
502 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
503 : d->pcxt);
504 0 : break;
505 20 : case T_BitmapHeapScanState:
506 20 : if (planstate->plan->parallel_aware)
507 18 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
508 : d->pcxt);
509 20 : break;
510 192 : case T_HashJoinState:
511 192 : if (planstate->plan->parallel_aware)
512 120 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
513 : d->pcxt);
514 192 : break;
515 192 : case T_HashState:
516 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
517 192 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
518 192 : break;
519 152 : case T_SortState:
520 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
521 152 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
522 152 : break;
523 0 : case T_IncrementalSortState:
524 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
525 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
526 0 : break;
527 554 : case T_AggState:
528 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
529 554 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
530 554 : break;
531 6 : case T_MemoizeState:
532 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
533 6 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
534 6 : break;
535 142 : default:
536 142 : break;
537 : }
538 :
539 2960 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
540 : }
541 :
542 : /*
543 : * It sets up the response queues for backend workers to return tuples
544 : * to the main backend and start the workers.
545 : */
546 : static shm_mq_handle **
547 976 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
548 : {
549 : shm_mq_handle **responseq;
550 : char *tqueuespace;
551 : int i;
552 :
553 : /* Skip this if no workers. */
554 976 : if (pcxt->nworkers == 0)
555 0 : return NULL;
556 :
557 : /* Allocate memory for shared memory queue handles. */
558 : responseq = (shm_mq_handle **)
559 976 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
560 :
561 : /*
562 : * If not reinitializing, allocate space from the DSM for the queues;
563 : * otherwise, find the already allocated space.
564 : */
565 976 : if (!reinitialize)
566 : tqueuespace =
567 718 : shm_toc_allocate(pcxt->toc,
568 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
569 718 : pcxt->nworkers));
570 : else
571 258 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
572 :
573 : /* Create the queues, and become the receiver for each. */
574 3586 : for (i = 0; i < pcxt->nworkers; ++i)
575 : {
576 : shm_mq *mq;
577 :
578 2610 : mq = shm_mq_create(tqueuespace +
579 2610 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
580 : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
581 :
582 2610 : shm_mq_set_receiver(mq, MyProc);
583 2610 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
584 : }
585 :
586 : /* Add array of queues to shm_toc, so others can find it. */
587 976 : if (!reinitialize)
588 718 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
589 :
590 : /* Return array of handles. */
591 976 : return responseq;
592 : }
593 :
594 : /*
595 : * Sets up the required infrastructure for backend workers to perform
596 : * execution and return results to the main backend.
597 : */
598 : ParallelExecutorInfo *
599 718 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
600 : Bitmapset *sendParams, int nworkers,
601 : int64 tuples_needed)
602 : {
603 : ParallelExecutorInfo *pei;
604 : ParallelContext *pcxt;
605 : ExecParallelEstimateContext e;
606 : ExecParallelInitializeDSMContext d;
607 : FixedParallelExecutorState *fpes;
608 : char *pstmt_data;
609 : char *pstmt_space;
610 : char *paramlistinfo_space;
611 : BufferUsage *bufusage_space;
612 : WalUsage *walusage_space;
613 718 : SharedExecutorInstrumentation *instrumentation = NULL;
614 718 : SharedJitInstrumentation *jit_instrumentation = NULL;
615 : int pstmt_len;
616 : int paramlistinfo_len;
617 718 : int instrumentation_len = 0;
618 718 : int jit_instrumentation_len = 0;
619 718 : int instrument_offset = 0;
620 718 : Size dsa_minsize = dsa_minimum_size();
621 : char *query_string;
622 : int query_len;
623 :
624 : /*
625 : * Force any initplan outputs that we're going to pass to workers to be
626 : * evaluated, if they weren't already.
627 : *
628 : * For simplicity, we use the EState's per-output-tuple ExprContext here.
629 : * That risks intra-query memory leakage, since we might pass through here
630 : * many times before that ExprContext gets reset; but ExecSetParamPlan
631 : * doesn't normally leak any memory in the context (see its comments), so
632 : * it doesn't seem worth complicating this function's API to pass it a
633 : * shorter-lived ExprContext. This might need to change someday.
634 : */
635 718 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
636 :
637 : /* Allocate object for return value. */
638 718 : pei = palloc0(sizeof(ParallelExecutorInfo));
639 718 : pei->finished = false;
640 718 : pei->planstate = planstate;
641 :
642 : /* Fix up and serialize plan to be sent to workers. */
643 718 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
644 :
645 : /* Create a parallel context. */
646 718 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
647 718 : pei->pcxt = pcxt;
648 :
649 : /*
650 : * Before telling the parallel context to create a dynamic shared memory
651 : * segment, we need to figure out how big it should be. Estimate space
652 : * for the various things we need to store.
653 : */
654 :
655 : /* Estimate space for fixed-size state. */
656 718 : shm_toc_estimate_chunk(&pcxt->estimator,
657 : sizeof(FixedParallelExecutorState));
658 718 : shm_toc_estimate_keys(&pcxt->estimator, 1);
659 :
660 : /* Estimate space for query text. */
661 718 : query_len = strlen(estate->es_sourceText);
662 718 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
663 718 : shm_toc_estimate_keys(&pcxt->estimator, 1);
664 :
665 : /* Estimate space for serialized PlannedStmt. */
666 718 : pstmt_len = strlen(pstmt_data) + 1;
667 718 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
668 718 : shm_toc_estimate_keys(&pcxt->estimator, 1);
669 :
670 : /* Estimate space for serialized ParamListInfo. */
671 718 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
672 718 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
673 718 : shm_toc_estimate_keys(&pcxt->estimator, 1);
674 :
675 : /*
676 : * Estimate space for BufferUsage.
677 : *
678 : * If EXPLAIN is not in use and there are no extensions loaded that care,
679 : * we could skip this. But we have no way of knowing whether anyone's
680 : * looking at pgBufferUsage, so do it unconditionally.
681 : */
682 718 : shm_toc_estimate_chunk(&pcxt->estimator,
683 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
684 718 : shm_toc_estimate_keys(&pcxt->estimator, 1);
685 :
686 : /*
687 : * Same thing for WalUsage.
688 : */
689 718 : shm_toc_estimate_chunk(&pcxt->estimator,
690 : mul_size(sizeof(WalUsage), pcxt->nworkers));
691 718 : shm_toc_estimate_keys(&pcxt->estimator, 1);
692 :
693 : /* Estimate space for tuple queues. */
694 718 : shm_toc_estimate_chunk(&pcxt->estimator,
695 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
696 718 : shm_toc_estimate_keys(&pcxt->estimator, 1);
697 :
698 : /*
699 : * Give parallel-aware nodes a chance to add to the estimates, and get a
700 : * count of how many PlanState nodes there are.
701 : */
702 718 : e.pcxt = pcxt;
703 718 : e.nnodes = 0;
704 718 : ExecParallelEstimate(planstate, &e);
705 :
706 : /* Estimate space for instrumentation, if required. */
707 718 : if (estate->es_instrument)
708 : {
709 180 : instrumentation_len =
710 : offsetof(SharedExecutorInstrumentation, plan_node_id) +
711 180 : sizeof(int) * e.nnodes;
712 180 : instrumentation_len = MAXALIGN(instrumentation_len);
713 180 : instrument_offset = instrumentation_len;
714 180 : instrumentation_len +=
715 180 : mul_size(sizeof(Instrumentation),
716 180 : mul_size(e.nnodes, nworkers));
717 180 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
718 180 : shm_toc_estimate_keys(&pcxt->estimator, 1);
719 :
720 : /* Estimate space for JIT instrumentation, if required. */
721 180 : if (estate->es_jit_flags != PGJIT_NONE)
722 : {
723 24 : jit_instrumentation_len =
724 24 : offsetof(SharedJitInstrumentation, jit_instr) +
725 : sizeof(JitInstrumentation) * nworkers;
726 24 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
727 24 : shm_toc_estimate_keys(&pcxt->estimator, 1);
728 : }
729 : }
730 :
731 : /* Estimate space for DSA area. */
732 718 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
733 718 : shm_toc_estimate_keys(&pcxt->estimator, 1);
734 :
735 : /*
736 : * InitializeParallelDSM() passes the active snapshot to the parallel
737 : * worker, which uses it to set es_snapshot. Make sure we don't set
738 : * es_snapshot differently in the child.
739 : */
740 : Assert(GetActiveSnapshot() == estate->es_snapshot);
741 :
742 : /* Everyone's had a chance to ask for space, so now create the DSM. */
743 718 : InitializeParallelDSM(pcxt);
744 :
745 : /*
746 : * OK, now we have a dynamic shared memory segment, and it should be big
747 : * enough to store all of the data we estimated we would want to put into
748 : * it, plus whatever general stuff (not specifically executor-related) the
749 : * ParallelContext itself needs to store there. None of the space we
750 : * asked for has been allocated or initialized yet, though, so do that.
751 : */
752 :
753 : /* Store fixed-size state. */
754 718 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
755 718 : fpes->tuples_needed = tuples_needed;
756 718 : fpes->param_exec = InvalidDsaPointer;
757 718 : fpes->eflags = estate->es_top_eflags;
758 718 : fpes->jit_flags = estate->es_jit_flags;
759 718 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
760 :
761 : /* Store query string */
762 718 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
763 718 : memcpy(query_string, estate->es_sourceText, query_len + 1);
764 718 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
765 :
766 : /* Store serialized PlannedStmt. */
767 718 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
768 718 : memcpy(pstmt_space, pstmt_data, pstmt_len);
769 718 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
770 :
771 : /* Store serialized ParamListInfo. */
772 718 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
773 718 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
774 718 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
775 :
776 : /* Allocate space for each worker's BufferUsage; no need to initialize. */
777 718 : bufusage_space = shm_toc_allocate(pcxt->toc,
778 718 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
779 718 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
780 718 : pei->buffer_usage = bufusage_space;
781 :
782 : /* Same for WalUsage. */
783 718 : walusage_space = shm_toc_allocate(pcxt->toc,
784 718 : mul_size(sizeof(WalUsage), pcxt->nworkers));
785 718 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
786 718 : pei->wal_usage = walusage_space;
787 :
788 : /* Set up the tuple queues that the workers will write into. */
789 718 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
790 :
791 : /* We don't need the TupleQueueReaders yet, though. */
792 718 : pei->reader = NULL;
793 :
794 : /*
795 : * If instrumentation options were supplied, allocate space for the data.
796 : * It only gets partially initialized here; the rest happens during
797 : * ExecParallelInitializeDSM.
798 : */
799 718 : if (estate->es_instrument)
800 : {
801 : Instrumentation *instrument;
802 : int i;
803 :
804 180 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
805 180 : instrumentation->instrument_options = estate->es_instrument;
806 180 : instrumentation->instrument_offset = instrument_offset;
807 180 : instrumentation->num_workers = nworkers;
808 180 : instrumentation->num_plan_nodes = e.nnodes;
809 180 : instrument = GetInstrumentationArray(instrumentation);
810 1860 : for (i = 0; i < nworkers * e.nnodes; ++i)
811 1680 : InstrInit(&instrument[i], estate->es_instrument);
812 180 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
813 : instrumentation);
814 180 : pei->instrumentation = instrumentation;
815 :
816 180 : if (estate->es_jit_flags != PGJIT_NONE)
817 : {
818 24 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
819 : jit_instrumentation_len);
820 24 : jit_instrumentation->num_workers = nworkers;
821 24 : memset(jit_instrumentation->jit_instr, 0,
822 : sizeof(JitInstrumentation) * nworkers);
823 24 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
824 : jit_instrumentation);
825 24 : pei->jit_instrumentation = jit_instrumentation;
826 : }
827 : }
828 :
829 : /*
830 : * Create a DSA area that can be used by the leader and all workers.
831 : * (However, if we failed to create a DSM and are using private memory
832 : * instead, then skip this.)
833 : */
834 718 : if (pcxt->seg != NULL)
835 : {
836 : char *area_space;
837 :
838 718 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
839 718 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
840 718 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
841 : LWTRANCHE_PARALLEL_QUERY_DSA,
842 : pcxt->seg);
843 :
844 : /*
845 : * Serialize parameters, if any, using DSA storage. We don't dare use
846 : * the main parallel query DSM for this because we might relaunch
847 : * workers after the values have changed (and thus the amount of
848 : * storage required has changed).
849 : */
850 718 : if (!bms_is_empty(sendParams))
851 : {
852 24 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
853 : pei->area);
854 24 : fpes->param_exec = pei->param_exec;
855 : }
856 : }
857 :
858 : /*
859 : * Give parallel-aware nodes a chance to initialize their shared data.
860 : * This also initializes the elements of instrumentation->ps_instrument,
861 : * if it exists.
862 : */
863 718 : d.pcxt = pcxt;
864 718 : d.instrumentation = instrumentation;
865 718 : d.nnodes = 0;
866 :
867 : /* Install our DSA area while initializing the plan. */
868 718 : estate->es_query_dsa = pei->area;
869 718 : ExecParallelInitializeDSM(planstate, &d);
870 718 : estate->es_query_dsa = NULL;
871 :
872 : /*
873 : * Make sure that the world hasn't shifted under our feet. This could
874 : * probably just be an Assert(), but let's be conservative for now.
875 : */
876 718 : if (e.nnodes != d.nnodes)
877 0 : elog(ERROR, "inconsistent count of PlanState nodes");
878 :
879 : /* OK, we're ready to rock and roll. */
880 718 : return pei;
881 : }
882 :
883 : /*
884 : * Set up tuple queue readers to read the results of a parallel subplan.
885 : *
886 : * This is separate from ExecInitParallelPlan() because we can launch the
887 : * worker processes and let them start doing something before we do this.
888 : */
889 : void
890 958 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
891 : {
892 958 : int nworkers = pei->pcxt->nworkers_launched;
893 : int i;
894 :
895 : Assert(pei->reader == NULL);
896 :
897 958 : if (nworkers > 0)
898 : {
899 958 : pei->reader = (TupleQueueReader **)
900 958 : palloc(nworkers * sizeof(TupleQueueReader *));
901 :
902 3486 : for (i = 0; i < nworkers; i++)
903 : {
904 2528 : shm_mq_set_handle(pei->tqueue[i],
905 2528 : pei->pcxt->worker[i].bgwhandle);
906 2528 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
907 : }
908 : }
909 958 : }
910 :
911 : /*
912 : * Re-initialize the parallel executor shared memory state before launching
913 : * a fresh batch of workers.
914 : */
915 : void
916 258 : ExecParallelReinitialize(PlanState *planstate,
917 : ParallelExecutorInfo *pei,
918 : Bitmapset *sendParams)
919 : {
920 258 : EState *estate = planstate->state;
921 : FixedParallelExecutorState *fpes;
922 :
923 : /* Old workers must already be shut down */
924 : Assert(pei->finished);
925 :
926 : /*
927 : * Force any initplan outputs that we're going to pass to workers to be
928 : * evaluated, if they weren't already (see comments in
929 : * ExecInitParallelPlan).
930 : */
931 258 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
932 :
933 258 : ReinitializeParallelDSM(pei->pcxt);
934 258 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
935 258 : pei->reader = NULL;
936 258 : pei->finished = false;
937 :
938 258 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
939 :
940 : /* Free any serialized parameters from the last round. */
941 258 : if (DsaPointerIsValid(fpes->param_exec))
942 : {
943 0 : dsa_free(pei->area, fpes->param_exec);
944 0 : fpes->param_exec = InvalidDsaPointer;
945 : }
946 :
947 : /* Serialize current parameter values if required. */
948 258 : if (!bms_is_empty(sendParams))
949 : {
950 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
951 : pei->area);
952 0 : fpes->param_exec = pei->param_exec;
953 : }
954 :
955 : /* Traverse plan tree and let each child node reset associated state. */
956 258 : estate->es_query_dsa = pei->area;
957 258 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
958 258 : estate->es_query_dsa = NULL;
959 258 : }
960 :
961 : /*
962 : * Traverse plan tree to reinitialize per-node dynamic shared memory state
963 : */
964 : static bool
965 666 : ExecParallelReInitializeDSM(PlanState *planstate,
966 : ParallelContext *pcxt)
967 : {
968 666 : if (planstate == NULL)
969 0 : return false;
970 :
971 : /*
972 : * Call reinitializers for DSM-using plan nodes.
973 : */
974 666 : switch (nodeTag(planstate))
975 : {
976 276 : case T_SeqScanState:
977 276 : if (planstate->plan->parallel_aware)
978 228 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
979 : pcxt);
980 276 : break;
981 12 : case T_IndexScanState:
982 12 : if (planstate->plan->parallel_aware)
983 12 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
984 : pcxt);
985 12 : break;
986 12 : case T_IndexOnlyScanState:
987 12 : if (planstate->plan->parallel_aware)
988 12 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
989 : pcxt);
990 12 : break;
991 0 : case T_ForeignScanState:
992 0 : if (planstate->plan->parallel_aware)
993 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
994 : pcxt);
995 0 : break;
996 0 : case T_AppendState:
997 0 : if (planstate->plan->parallel_aware)
998 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
999 0 : break;
1000 0 : case T_CustomScanState:
1001 0 : if (planstate->plan->parallel_aware)
1002 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1003 : pcxt);
1004 0 : break;
1005 54 : case T_BitmapHeapScanState:
1006 54 : if (planstate->plan->parallel_aware)
1007 54 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1008 : pcxt);
1009 54 : break;
1010 96 : case T_HashJoinState:
1011 96 : if (planstate->plan->parallel_aware)
1012 48 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1013 : pcxt);
1014 96 : break;
1015 180 : case T_BitmapIndexScanState:
1016 : case T_HashState:
1017 : case T_SortState:
1018 : case T_IncrementalSortState:
1019 : case T_MemoizeState:
1020 : /* these nodes have DSM state, but no reinitialization is required */
1021 180 : break;
1022 :
1023 36 : default:
1024 36 : break;
1025 : }
1026 :
1027 666 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1028 : }
1029 :
1030 : /*
1031 : * Copy instrumentation information about this node and its descendants from
1032 : * dynamic shared memory.
1033 : */
1034 : static bool
1035 1026 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1036 : SharedExecutorInstrumentation *instrumentation)
1037 : {
1038 : Instrumentation *instrument;
1039 : int i;
1040 : int n;
1041 : int ibytes;
1042 1026 : int plan_node_id = planstate->plan->plan_node_id;
1043 : MemoryContext oldcontext;
1044 :
1045 : /* Find the instrumentation for this node. */
1046 4638 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1047 4638 : if (instrumentation->plan_node_id[i] == plan_node_id)
1048 1026 : break;
1049 1026 : if (i >= instrumentation->num_plan_nodes)
1050 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1051 :
1052 : /* Accumulate the statistics from all workers. */
1053 1026 : instrument = GetInstrumentationArray(instrumentation);
1054 1026 : instrument += i * instrumentation->num_workers;
1055 2706 : for (n = 0; n < instrumentation->num_workers; ++n)
1056 1680 : InstrAggNode(planstate->instrument, &instrument[n]);
1057 :
1058 : /*
1059 : * Also store the per-worker detail.
1060 : *
1061 : * Worker instrumentation should be allocated in the same context as the
1062 : * regular instrumentation information, which is the per-query context.
1063 : * Switch into per-query memory context.
1064 : */
1065 1026 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1066 1026 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1067 1026 : planstate->worker_instrument =
1068 1026 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1069 1026 : MemoryContextSwitchTo(oldcontext);
1070 :
1071 1026 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
1072 1026 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1073 :
1074 : /* Perform any node-type-specific work that needs to be done. */
1075 1026 : switch (nodeTag(planstate))
1076 : {
1077 270 : case T_IndexScanState:
1078 270 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1079 270 : break;
1080 0 : case T_IndexOnlyScanState:
1081 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1082 0 : break;
1083 0 : case T_BitmapIndexScanState:
1084 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1085 0 : break;
1086 12 : case T_SortState:
1087 12 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1088 12 : break;
1089 0 : case T_IncrementalSortState:
1090 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1091 0 : break;
1092 84 : case T_HashState:
1093 84 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1094 84 : break;
1095 102 : case T_AggState:
1096 102 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1097 102 : break;
1098 0 : case T_MemoizeState:
1099 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1100 0 : break;
1101 0 : case T_BitmapHeapScanState:
1102 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1103 0 : break;
1104 558 : default:
1105 558 : break;
1106 : }
1107 :
1108 1026 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1109 : instrumentation);
1110 : }
1111 :
1112 : /*
1113 : * Add up the workers' JIT instrumentation from dynamic shared memory.
1114 : */
1115 : static void
1116 24 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1117 : SharedJitInstrumentation *shared_jit)
1118 : {
1119 : JitInstrumentation *combined;
1120 : int ibytes;
1121 :
1122 : int n;
1123 :
1124 : /*
1125 : * Accumulate worker JIT instrumentation into the combined JIT
1126 : * instrumentation, allocating it if required.
1127 : */
1128 24 : if (!planstate->state->es_jit_worker_instr)
1129 24 : planstate->state->es_jit_worker_instr =
1130 24 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1131 24 : combined = planstate->state->es_jit_worker_instr;
1132 :
1133 : /* Accumulate all the workers' instrumentations. */
1134 72 : for (n = 0; n < shared_jit->num_workers; ++n)
1135 48 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1136 :
1137 : /*
1138 : * Store the per-worker detail.
1139 : *
1140 : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1141 : * instrumentation in per-query context.
1142 : */
1143 24 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1144 24 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1145 24 : planstate->worker_jit_instrument =
1146 24 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1147 :
1148 24 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1149 24 : }
1150 :
1151 : /*
1152 : * Finish parallel execution. We wait for parallel workers to finish, and
1153 : * accumulate their buffer/WAL usage.
1154 : */
1155 : void
1156 1760 : ExecParallelFinish(ParallelExecutorInfo *pei)
1157 : {
1158 1760 : int nworkers = pei->pcxt->nworkers_launched;
1159 : int i;
1160 :
1161 : /* Make this be a no-op if called twice in a row. */
1162 1760 : if (pei->finished)
1163 796 : return;
1164 :
1165 : /*
1166 : * Detach from tuple queues ASAP, so that any still-active workers will
1167 : * notice that no further results are wanted.
1168 : */
1169 964 : if (pei->tqueue != NULL)
1170 : {
1171 3480 : for (i = 0; i < nworkers; i++)
1172 2516 : shm_mq_detach(pei->tqueue[i]);
1173 964 : pfree(pei->tqueue);
1174 964 : pei->tqueue = NULL;
1175 : }
1176 :
1177 : /*
1178 : * While we're waiting for the workers to finish, let's get rid of the
1179 : * tuple queue readers. (Any other local cleanup could be done here too.)
1180 : */
1181 964 : if (pei->reader != NULL)
1182 : {
1183 3462 : for (i = 0; i < nworkers; i++)
1184 2516 : DestroyTupleQueueReader(pei->reader[i]);
1185 946 : pfree(pei->reader);
1186 946 : pei->reader = NULL;
1187 : }
1188 :
1189 : /* Now wait for the workers to finish. */
1190 964 : WaitForParallelWorkersToFinish(pei->pcxt);
1191 :
1192 : /*
1193 : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1194 : * finish, or we might get incomplete data.)
1195 : */
1196 3480 : for (i = 0; i < nworkers; i++)
1197 2516 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1198 :
1199 964 : pei->finished = true;
1200 : }
1201 :
1202 : /*
1203 : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1204 : * resources still exist after ExecParallelFinish. We separate these
1205 : * routines because someone might want to examine the contents of the DSM
1206 : * after ExecParallelFinish and before calling this routine.
1207 : */
1208 : void
1209 706 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1210 : {
1211 : /* Accumulate instrumentation, if any. */
1212 706 : if (pei->instrumentation)
1213 180 : ExecParallelRetrieveInstrumentation(pei->planstate,
1214 : pei->instrumentation);
1215 :
1216 : /* Accumulate JIT instrumentation, if any. */
1217 706 : if (pei->jit_instrumentation)
1218 24 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1219 24 : pei->jit_instrumentation);
1220 :
1221 : /* Free any serialized parameters. */
1222 706 : if (DsaPointerIsValid(pei->param_exec))
1223 : {
1224 24 : dsa_free(pei->area, pei->param_exec);
1225 24 : pei->param_exec = InvalidDsaPointer;
1226 : }
1227 706 : if (pei->area != NULL)
1228 : {
1229 706 : dsa_detach(pei->area);
1230 706 : pei->area = NULL;
1231 : }
1232 706 : if (pei->pcxt != NULL)
1233 : {
1234 706 : DestroyParallelContext(pei->pcxt);
1235 706 : pei->pcxt = NULL;
1236 : }
1237 706 : pfree(pei);
1238 706 : }
1239 :
1240 : /*
1241 : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1242 : * for that purpose.
1243 : */
1244 : static DestReceiver *
1245 2528 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1246 : {
1247 : char *mqspace;
1248 : shm_mq *mq;
1249 :
1250 2528 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1251 2528 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1252 2528 : mq = (shm_mq *) mqspace;
1253 2528 : shm_mq_set_sender(mq, MyProc);
1254 2528 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1255 : }
1256 :
1257 : /*
1258 : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1259 : */
1260 : static QueryDesc *
1261 2528 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1262 : int instrument_options)
1263 : {
1264 : char *pstmtspace;
1265 : char *paramspace;
1266 : PlannedStmt *pstmt;
1267 : ParamListInfo paramLI;
1268 : char *queryString;
1269 :
1270 : /* Get the query string from shared memory */
1271 2528 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1272 :
1273 : /* Reconstruct leader-supplied PlannedStmt. */
1274 2528 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1275 2528 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1276 :
1277 : /* Reconstruct ParamListInfo. */
1278 2528 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1279 2528 : paramLI = RestoreParamList(¶mspace);
1280 :
1281 : /*
1282 : * Create a QueryDesc for the query. We pass NULL for cachedplan, because
1283 : * we don't have a pointer to the CachedPlan in the leader's process. It's
1284 : * fine because the only reason the executor needs to see it is to decide
1285 : * if it should take locks on certain relations, but parallel workers
1286 : * always take locks anyway.
1287 : */
1288 2528 : return CreateQueryDesc(pstmt,
1289 : NULL,
1290 : queryString,
1291 : GetActiveSnapshot(), InvalidSnapshot,
1292 : receiver, paramLI, NULL, instrument_options);
1293 : }
1294 :
1295 : /*
1296 : * Copy instrumentation information from this node and its descendants into
1297 : * dynamic shared memory, so that the parallel leader can retrieve it.
1298 : */
1299 : static bool
1300 2366 : ExecParallelReportInstrumentation(PlanState *planstate,
1301 : SharedExecutorInstrumentation *instrumentation)
1302 : {
1303 : int i;
1304 2366 : int plan_node_id = planstate->plan->plan_node_id;
1305 : Instrumentation *instrument;
1306 :
1307 2366 : InstrEndLoop(planstate->instrument);
1308 :
1309 : /*
1310 : * If we shuffled the plan_node_id values in ps_instrument into sorted
1311 : * order, we could use binary search here. This might matter someday if
1312 : * we're pushing down sufficiently large plan trees. For now, do it the
1313 : * slow, dumb way.
1314 : */
1315 7782 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1316 7782 : if (instrumentation->plan_node_id[i] == plan_node_id)
1317 2366 : break;
1318 2366 : if (i >= instrumentation->num_plan_nodes)
1319 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1320 :
1321 : /*
1322 : * Add our statistics to the per-node, per-worker totals. It's possible
1323 : * that this could happen more than once if we relaunched workers.
1324 : */
1325 2366 : instrument = GetInstrumentationArray(instrumentation);
1326 2366 : instrument += i * instrumentation->num_workers;
1327 : Assert(IsParallelWorker());
1328 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1329 2366 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1330 :
1331 2366 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1332 : instrumentation);
1333 : }
1334 :
1335 : /*
1336 : * Initialize the PlanState and its descendants with the information
1337 : * retrieved from shared memory. This has to be done once the PlanState
1338 : * is allocated and initialized by executor; that is, after ExecutorStart().
1339 : */
1340 : static bool
1341 8150 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1342 : {
1343 8150 : if (planstate == NULL)
1344 0 : return false;
1345 :
1346 8150 : switch (nodeTag(planstate))
1347 : {
1348 3312 : case T_SeqScanState:
1349 3312 : if (planstate->plan->parallel_aware)
1350 2684 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1351 3312 : break;
1352 396 : case T_IndexScanState:
1353 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1354 396 : ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
1355 396 : break;
1356 242 : case T_IndexOnlyScanState:
1357 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1358 242 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1359 : pwcxt);
1360 242 : break;
1361 272 : case T_BitmapIndexScanState:
1362 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1363 272 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1364 : pwcxt);
1365 272 : break;
1366 0 : case T_ForeignScanState:
1367 0 : if (planstate->plan->parallel_aware)
1368 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1369 : pwcxt);
1370 0 : break;
1371 376 : case T_AppendState:
1372 376 : if (planstate->plan->parallel_aware)
1373 316 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1374 376 : break;
1375 0 : case T_CustomScanState:
1376 0 : if (planstate->plan->parallel_aware)
1377 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1378 : pwcxt);
1379 0 : break;
1380 272 : case T_BitmapHeapScanState:
1381 272 : if (planstate->plan->parallel_aware)
1382 270 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1383 : pwcxt);
1384 272 : break;
1385 546 : case T_HashJoinState:
1386 546 : if (planstate->plan->parallel_aware)
1387 306 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1388 : pwcxt);
1389 546 : break;
1390 546 : case T_HashState:
1391 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1392 546 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1393 546 : break;
1394 452 : case T_SortState:
1395 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1396 452 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1397 452 : break;
1398 0 : case T_IncrementalSortState:
1399 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1400 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1401 : pwcxt);
1402 0 : break;
1403 1546 : case T_AggState:
1404 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1405 1546 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1406 1546 : break;
1407 12 : case T_MemoizeState:
1408 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1409 12 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1410 12 : break;
1411 178 : default:
1412 178 : break;
1413 : }
1414 :
1415 8150 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1416 : pwcxt);
1417 : }
1418 :
1419 : /*
1420 : * Main entrypoint for parallel query worker processes.
1421 : *
1422 : * We reach this function from ParallelWorkerMain, so the setup necessary to
1423 : * create a sensible parallel environment has already been done;
1424 : * ParallelWorkerMain worries about stuff like the transaction state, combo
1425 : * CID mappings, and GUC values, so we don't need to deal with any of that
1426 : * here.
1427 : *
1428 : * Our job is to deal with concerns specific to the executor. The parallel
1429 : * group leader will have stored a serialized PlannedStmt, and it's our job
1430 : * to execute that plan and write the resulting tuples to the appropriate
1431 : * tuple queue. Various bits of supporting information that we need in order
1432 : * to do this are also stored in the dsm_segment and can be accessed through
1433 : * the shm_toc.
1434 : */
1435 : void
1436 2528 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1437 : {
1438 : FixedParallelExecutorState *fpes;
1439 : BufferUsage *buffer_usage;
1440 : WalUsage *wal_usage;
1441 : DestReceiver *receiver;
1442 : QueryDesc *queryDesc;
1443 : SharedExecutorInstrumentation *instrumentation;
1444 : SharedJitInstrumentation *jit_instrumentation;
1445 2528 : int instrument_options = 0;
1446 : void *area_space;
1447 : dsa_area *area;
1448 : ParallelWorkerContext pwcxt;
1449 :
1450 : /* Get fixed-size state. */
1451 2528 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1452 :
1453 : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1454 2528 : receiver = ExecParallelGetReceiver(seg, toc);
1455 2528 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1456 2528 : if (instrumentation != NULL)
1457 724 : instrument_options = instrumentation->instrument_options;
1458 2528 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1459 : true);
1460 2528 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1461 :
1462 : /* Setting debug_query_string for individual workers */
1463 2528 : debug_query_string = queryDesc->sourceText;
1464 :
1465 : /* Report workers' query for monitoring purposes */
1466 2528 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1467 :
1468 : /* Attach to the dynamic shared memory area. */
1469 2528 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1470 2528 : area = dsa_attach_in_place(area_space, seg);
1471 :
1472 : /* Start up the executor */
1473 2528 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1474 2528 : if (!ExecutorStart(queryDesc, fpes->eflags))
1475 0 : elog(ERROR, "ExecutorStart() failed unexpectedly");
1476 :
1477 : /* Special executor initialization steps for parallel workers */
1478 2528 : queryDesc->planstate->state->es_query_dsa = area;
1479 2528 : if (DsaPointerIsValid(fpes->param_exec))
1480 : {
1481 : char *paramexec_space;
1482 :
1483 72 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1484 72 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1485 : }
1486 2528 : pwcxt.toc = toc;
1487 2528 : pwcxt.seg = seg;
1488 2528 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1489 :
1490 : /* Pass down any tuple bound */
1491 2528 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1492 :
1493 : /*
1494 : * Prepare to track buffer/WAL usage during query execution.
1495 : *
1496 : * We do this after starting up the executor to match what happens in the
1497 : * leader, which also doesn't count buffer accesses and WAL activity that
1498 : * occur during executor startup.
1499 : */
1500 2528 : InstrStartParallelQuery();
1501 :
1502 : /*
1503 : * Run the plan. If we specified a tuple bound, be careful not to demand
1504 : * more tuples than that.
1505 : */
1506 2528 : ExecutorRun(queryDesc,
1507 : ForwardScanDirection,
1508 2528 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1509 :
1510 : /* Shut down the executor */
1511 2516 : ExecutorFinish(queryDesc);
1512 :
1513 : /* Report buffer/WAL usage during parallel execution. */
1514 2516 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1515 2516 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1516 2516 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1517 2516 : &wal_usage[ParallelWorkerNumber]);
1518 :
1519 : /* Report instrumentation data if any instrumentation options are set. */
1520 2516 : if (instrumentation != NULL)
1521 724 : ExecParallelReportInstrumentation(queryDesc->planstate,
1522 : instrumentation);
1523 :
1524 : /* Report JIT instrumentation data if any */
1525 2516 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1526 : {
1527 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1528 144 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1529 144 : queryDesc->estate->es_jit->instr;
1530 : }
1531 :
1532 : /* Must do this after capturing instrumentation. */
1533 2516 : ExecutorEnd(queryDesc);
1534 :
1535 : /* Cleanup. */
1536 2516 : dsa_detach(area);
1537 2516 : FreeQueryDesc(queryDesc);
1538 2516 : receiver->rDestroy(receiver);
1539 2516 : }
|