Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execParallel.c
4 : * Support routines for parallel execution.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * This file contains routines that are intended to support setting up,
10 : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : * executor. The ParallelContext machinery will handle starting the
12 : * workers and ensuring that their state generally matches that of the
13 : * leader; see src/backend/access/transam/README.parallel for details.
14 : * However, we must save and restore relevant executor state, such as
15 : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : * the actual plan to be passed down to the worker.
17 : *
18 : * IDENTIFICATION
19 : * src/backend/executor/execParallel.c
20 : *
21 : *-------------------------------------------------------------------------
22 : */
23 :
24 : #include "postgres.h"
25 :
26 : #include "executor/execParallel.h"
27 : #include "executor/executor.h"
28 : #include "executor/nodeAgg.h"
29 : #include "executor/nodeAppend.h"
30 : #include "executor/nodeBitmapHeapscan.h"
31 : #include "executor/nodeBitmapIndexscan.h"
32 : #include "executor/nodeCustom.h"
33 : #include "executor/nodeForeignscan.h"
34 : #include "executor/nodeHash.h"
35 : #include "executor/nodeHashjoin.h"
36 : #include "executor/nodeIncrementalSort.h"
37 : #include "executor/nodeIndexonlyscan.h"
38 : #include "executor/nodeIndexscan.h"
39 : #include "executor/nodeMemoize.h"
40 : #include "executor/nodeSeqscan.h"
41 : #include "executor/nodeSort.h"
42 : #include "executor/nodeSubplan.h"
43 : #include "executor/nodeTidrangescan.h"
44 : #include "executor/tqueue.h"
45 : #include "jit/jit.h"
46 : #include "nodes/nodeFuncs.h"
47 : #include "pgstat.h"
48 : #include "tcop/tcopprot.h"
49 : #include "utils/datum.h"
50 : #include "utils/dsa.h"
51 : #include "utils/lsyscache.h"
52 : #include "utils/snapmgr.h"
53 :
54 : /*
55 : * Magic numbers for parallel executor communication. We use constants
56 : * greater than any 32-bit integer here so that values < 2^32 can be used
57 : * by individual parallel nodes to store their own state.
58 : */
59 : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
60 : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
61 : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
62 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
63 : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
64 : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
65 : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
66 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
67 : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
68 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
69 :
70 : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
71 :
72 : /*
73 : * Fixed-size random stuff that we need to pass to parallel workers.
74 : */
75 : typedef struct FixedParallelExecutorState
76 : {
77 : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
78 : dsa_pointer param_exec;
79 : int eflags;
80 : int jit_flags;
81 : } FixedParallelExecutorState;
82 :
83 : /*
84 : * DSM structure for accumulating per-PlanState instrumentation.
85 : *
86 : * instrument_options: Same meaning here as in instrument.c.
87 : *
88 : * instrument_offset: Offset, relative to the start of this structure,
89 : * of the first Instrumentation object. This will depend on the length of
90 : * the plan_node_id array.
91 : *
92 : * num_workers: Number of workers.
93 : *
94 : * num_plan_nodes: Number of plan nodes.
95 : *
96 : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
97 : * from parallel workers. The length of this array is given by num_plan_nodes.
98 : */
99 : struct SharedExecutorInstrumentation
100 : {
101 : int instrument_options;
102 : int instrument_offset;
103 : int num_workers;
104 : int num_plan_nodes;
105 : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
106 : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
107 : };
108 : #define GetInstrumentationArray(sei) \
109 : (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
110 : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
111 :
112 : /* Context object for ExecParallelEstimate. */
113 : typedef struct ExecParallelEstimateContext
114 : {
115 : ParallelContext *pcxt;
116 : int nnodes;
117 : } ExecParallelEstimateContext;
118 :
119 : /* Context object for ExecParallelInitializeDSM. */
120 : typedef struct ExecParallelInitializeDSMContext
121 : {
122 : ParallelContext *pcxt;
123 : SharedExecutorInstrumentation *instrumentation;
124 : int nnodes;
125 : } ExecParallelInitializeDSMContext;
126 :
127 : /* Helper functions that run in the parallel leader. */
128 : static char *ExecSerializePlan(Plan *plan, EState *estate);
129 : static bool ExecParallelEstimate(PlanState *planstate,
130 : ExecParallelEstimateContext *e);
131 : static bool ExecParallelInitializeDSM(PlanState *planstate,
132 : ExecParallelInitializeDSMContext *d);
133 : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
134 : bool reinitialize);
135 : static bool ExecParallelReInitializeDSM(PlanState *planstate,
136 : ParallelContext *pcxt);
137 : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
138 : SharedExecutorInstrumentation *instrumentation);
139 :
140 : /* Helper function that runs in the parallel worker. */
141 : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
142 :
143 : /*
144 : * Create a serialized representation of the plan to be sent to each worker.
145 : */
146 : static char *
147 754 : ExecSerializePlan(Plan *plan, EState *estate)
148 : {
149 : PlannedStmt *pstmt;
150 : ListCell *lc;
151 :
152 : /* We can't scribble on the original plan, so make a copy. */
153 754 : plan = copyObject(plan);
154 :
155 : /*
156 : * The worker will start its own copy of the executor, and that copy will
157 : * insert a junk filter if the toplevel node has any resjunk entries. We
158 : * don't want that to happen, because while resjunk columns shouldn't be
159 : * sent back to the user, here the tuples are coming back to another
160 : * backend which may very well need them. So mutate the target list
161 : * accordingly. This is sort of a hack; there might be better ways to do
162 : * this...
163 : */
164 2062 : foreach(lc, plan->targetlist)
165 : {
166 1308 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
167 :
168 1308 : tle->resjunk = false;
169 : }
170 :
171 : /*
172 : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
173 : * for our purposes, but the worker will need at least a minimal
174 : * PlannedStmt to start the executor.
175 : */
176 754 : pstmt = makeNode(PlannedStmt);
177 754 : pstmt->commandType = CMD_SELECT;
178 754 : pstmt->queryId = pgstat_get_my_query_id();
179 754 : pstmt->planId = pgstat_get_my_plan_id();
180 754 : pstmt->hasReturning = false;
181 754 : pstmt->hasModifyingCTE = false;
182 754 : pstmt->canSetTag = true;
183 754 : pstmt->transientPlan = false;
184 754 : pstmt->dependsOnRole = false;
185 754 : pstmt->parallelModeNeeded = false;
186 754 : pstmt->planTree = plan;
187 754 : pstmt->partPruneInfos = estate->es_part_prune_infos;
188 754 : pstmt->rtable = estate->es_range_table;
189 754 : pstmt->unprunableRelids = estate->es_unpruned_relids;
190 754 : pstmt->permInfos = estate->es_rteperminfos;
191 754 : pstmt->resultRelations = NIL;
192 754 : pstmt->appendRelations = NIL;
193 754 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
194 :
195 : /*
196 : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
197 : * for unsafe ones (so that the list indexes of the safe ones are
198 : * preserved). This positively ensures that the worker won't try to run,
199 : * or even do ExecInitNode on, an unsafe subplan. That's important to
200 : * protect, eg, non-parallel-aware FDWs from getting into trouble.
201 : */
202 754 : pstmt->subplans = NIL;
203 808 : foreach(lc, estate->es_plannedstmt->subplans)
204 : {
205 54 : Plan *subplan = (Plan *) lfirst(lc);
206 :
207 54 : if (subplan && !subplan->parallel_safe)
208 12 : subplan = NULL;
209 54 : pstmt->subplans = lappend(pstmt->subplans, subplan);
210 : }
211 :
212 754 : pstmt->rewindPlanIDs = NULL;
213 754 : pstmt->rowMarks = NIL;
214 754 : pstmt->relationOids = NIL;
215 754 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
216 754 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
217 754 : pstmt->utilityStmt = NULL;
218 754 : pstmt->stmt_location = -1;
219 754 : pstmt->stmt_len = -1;
220 :
221 : /* Return serialized copy of our dummy PlannedStmt. */
222 754 : return nodeToString(pstmt);
223 : }
224 :
225 : /*
226 : * Parallel-aware plan nodes (and occasionally others) may need some state
227 : * which is shared across all parallel workers. Before we size the DSM, give
228 : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
229 : * &pcxt->estimator.
230 : *
231 : * While we're at it, count the number of PlanState nodes in the tree, so
232 : * we know how many Instrumentation structures we need.
233 : */
234 : static bool
235 3050 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
236 : {
237 3050 : if (planstate == NULL)
238 0 : return false;
239 :
240 : /* Count this node. */
241 3050 : e->nnodes++;
242 :
243 3050 : switch (nodeTag(planstate))
244 : {
245 1162 : case T_SeqScanState:
246 1162 : if (planstate->plan->parallel_aware)
247 924 : ExecSeqScanEstimate((SeqScanState *) planstate,
248 : e->pcxt);
249 1162 : break;
250 294 : case T_IndexScanState:
251 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
252 294 : ExecIndexScanEstimate((IndexScanState *) planstate,
253 : e->pcxt);
254 294 : break;
255 58 : case T_IndexOnlyScanState:
256 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
257 58 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
258 : e->pcxt);
259 58 : break;
260 20 : case T_BitmapIndexScanState:
261 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
262 20 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
263 : e->pcxt);
264 20 : break;
265 0 : case T_ForeignScanState:
266 0 : if (planstate->plan->parallel_aware)
267 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
268 : e->pcxt);
269 0 : break;
270 24 : case T_TidRangeScanState:
271 24 : if (planstate->plan->parallel_aware)
272 24 : ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
273 : e->pcxt);
274 24 : break;
275 186 : case T_AppendState:
276 186 : if (planstate->plan->parallel_aware)
277 138 : ExecAppendEstimate((AppendState *) planstate,
278 : e->pcxt);
279 186 : break;
280 0 : case T_CustomScanState:
281 0 : if (planstate->plan->parallel_aware)
282 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
283 : e->pcxt);
284 0 : break;
285 20 : case T_BitmapHeapScanState:
286 20 : if (planstate->plan->parallel_aware)
287 18 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
288 : e->pcxt);
289 20 : break;
290 198 : case T_HashJoinState:
291 198 : if (planstate->plan->parallel_aware)
292 126 : ExecHashJoinEstimate((HashJoinState *) planstate,
293 : e->pcxt);
294 198 : break;
295 198 : case T_HashState:
296 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
297 198 : ExecHashEstimate((HashState *) planstate, e->pcxt);
298 198 : break;
299 158 : case T_SortState:
300 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
301 158 : ExecSortEstimate((SortState *) planstate, e->pcxt);
302 158 : break;
303 0 : case T_IncrementalSortState:
304 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
305 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
306 0 : break;
307 584 : case T_AggState:
308 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
309 584 : ExecAggEstimate((AggState *) planstate, e->pcxt);
310 584 : break;
311 6 : case T_MemoizeState:
312 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
313 6 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
314 6 : break;
315 142 : default:
316 142 : break;
317 : }
318 :
319 3050 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
320 : }
321 :
322 : /*
323 : * Estimate the amount of space required to serialize the indicated parameters.
324 : */
325 : static Size
326 24 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
327 : {
328 : int paramid;
329 24 : Size sz = sizeof(int);
330 :
331 24 : paramid = -1;
332 54 : while ((paramid = bms_next_member(params, paramid)) >= 0)
333 : {
334 : Oid typeOid;
335 : int16 typLen;
336 : bool typByVal;
337 : ParamExecData *prm;
338 :
339 30 : prm = &(estate->es_param_exec_vals[paramid]);
340 30 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
341 : paramid);
342 :
343 30 : sz = add_size(sz, sizeof(int)); /* space for paramid */
344 :
345 : /* space for datum/isnull */
346 30 : if (OidIsValid(typeOid))
347 30 : get_typlenbyval(typeOid, &typLen, &typByVal);
348 : else
349 : {
350 : /* If no type OID, assume by-value, like copyParamList does. */
351 0 : typLen = sizeof(Datum);
352 0 : typByVal = true;
353 : }
354 30 : sz = add_size(sz,
355 30 : datumEstimateSpace(prm->value, prm->isnull,
356 : typByVal, typLen));
357 : }
358 24 : return sz;
359 : }
360 :
361 : /*
362 : * Serialize specified PARAM_EXEC parameters.
363 : *
364 : * We write the number of parameters first, as a 4-byte integer, and then
365 : * write details for each parameter in turn. The details for each parameter
366 : * consist of a 4-byte paramid (location of param in execution time internal
367 : * parameter array) and then the datum as serialized by datumSerialize().
368 : */
369 : static dsa_pointer
370 24 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
371 : {
372 : Size size;
373 : int nparams;
374 : int paramid;
375 : ParamExecData *prm;
376 : dsa_pointer handle;
377 : char *start_address;
378 :
379 : /* Allocate enough space for the current parameter values. */
380 24 : size = EstimateParamExecSpace(estate, params);
381 24 : handle = dsa_allocate(area, size);
382 24 : start_address = dsa_get_address(area, handle);
383 :
384 : /* First write the number of parameters as a 4-byte integer. */
385 24 : nparams = bms_num_members(params);
386 24 : memcpy(start_address, &nparams, sizeof(int));
387 24 : start_address += sizeof(int);
388 :
389 : /* Write details for each parameter in turn. */
390 24 : paramid = -1;
391 54 : while ((paramid = bms_next_member(params, paramid)) >= 0)
392 : {
393 : Oid typeOid;
394 : int16 typLen;
395 : bool typByVal;
396 :
397 30 : prm = &(estate->es_param_exec_vals[paramid]);
398 30 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
399 : paramid);
400 :
401 : /* Write paramid. */
402 30 : memcpy(start_address, ¶mid, sizeof(int));
403 30 : start_address += sizeof(int);
404 :
405 : /* Write datum/isnull */
406 30 : if (OidIsValid(typeOid))
407 30 : get_typlenbyval(typeOid, &typLen, &typByVal);
408 : else
409 : {
410 : /* If no type OID, assume by-value, like copyParamList does. */
411 0 : typLen = sizeof(Datum);
412 0 : typByVal = true;
413 : }
414 30 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
415 : &start_address);
416 : }
417 :
418 24 : return handle;
419 : }
420 :
421 : /*
422 : * Restore specified PARAM_EXEC parameters.
423 : */
424 : static void
425 72 : RestoreParamExecParams(char *start_address, EState *estate)
426 : {
427 : int nparams;
428 : int i;
429 : int paramid;
430 :
431 72 : memcpy(&nparams, start_address, sizeof(int));
432 72 : start_address += sizeof(int);
433 :
434 156 : for (i = 0; i < nparams; i++)
435 : {
436 : ParamExecData *prm;
437 :
438 : /* Read paramid */
439 84 : memcpy(¶mid, start_address, sizeof(int));
440 84 : start_address += sizeof(int);
441 84 : prm = &(estate->es_param_exec_vals[paramid]);
442 :
443 : /* Read datum/isnull. */
444 84 : prm->value = datumRestore(&start_address, &prm->isnull);
445 84 : prm->execPlan = NULL;
446 : }
447 72 : }
448 :
449 : /*
450 : * Initialize the dynamic shared memory segment that will be used to control
451 : * parallel execution.
452 : */
453 : static bool
454 3050 : ExecParallelInitializeDSM(PlanState *planstate,
455 : ExecParallelInitializeDSMContext *d)
456 : {
457 3050 : if (planstate == NULL)
458 0 : return false;
459 :
460 : /* If instrumentation is enabled, initialize slot for this node. */
461 3050 : if (d->instrumentation != NULL)
462 1026 : d->instrumentation->plan_node_id[d->nnodes] =
463 1026 : planstate->plan->plan_node_id;
464 :
465 : /* Count this node. */
466 3050 : d->nnodes++;
467 :
468 : /*
469 : * Call initializers for DSM-using plan nodes.
470 : *
471 : * Most plan nodes won't do anything here, but plan nodes that allocated
472 : * DSM may need to initialize shared state in the DSM before parallel
473 : * workers are launched. They can allocate the space they previously
474 : * estimated using shm_toc_allocate, and add the keys they previously
475 : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
476 : */
477 3050 : switch (nodeTag(planstate))
478 : {
479 1162 : case T_SeqScanState:
480 1162 : if (planstate->plan->parallel_aware)
481 924 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
482 : d->pcxt);
483 1162 : break;
484 294 : case T_IndexScanState:
485 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
486 294 : ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
487 294 : break;
488 58 : case T_IndexOnlyScanState:
489 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
490 58 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
491 : d->pcxt);
492 58 : break;
493 20 : case T_BitmapIndexScanState:
494 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
495 20 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
496 20 : break;
497 0 : case T_ForeignScanState:
498 0 : if (planstate->plan->parallel_aware)
499 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
500 : d->pcxt);
501 0 : break;
502 24 : case T_TidRangeScanState:
503 24 : if (planstate->plan->parallel_aware)
504 24 : ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
505 : d->pcxt);
506 24 : break;
507 186 : case T_AppendState:
508 186 : if (planstate->plan->parallel_aware)
509 138 : ExecAppendInitializeDSM((AppendState *) planstate,
510 : d->pcxt);
511 186 : break;
512 0 : case T_CustomScanState:
513 0 : if (planstate->plan->parallel_aware)
514 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
515 : d->pcxt);
516 0 : break;
517 20 : case T_BitmapHeapScanState:
518 20 : if (planstate->plan->parallel_aware)
519 18 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
520 : d->pcxt);
521 20 : break;
522 198 : case T_HashJoinState:
523 198 : if (planstate->plan->parallel_aware)
524 126 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
525 : d->pcxt);
526 198 : break;
527 198 : case T_HashState:
528 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
529 198 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
530 198 : break;
531 158 : case T_SortState:
532 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
533 158 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
534 158 : break;
535 0 : case T_IncrementalSortState:
536 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
537 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
538 0 : break;
539 584 : case T_AggState:
540 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
541 584 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
542 584 : break;
543 6 : case T_MemoizeState:
544 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
545 6 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
546 6 : break;
547 142 : default:
548 142 : break;
549 : }
550 :
551 3050 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
552 : }
553 :
554 : /*
555 : * It sets up the response queues for backend workers to return tuples
556 : * to the main backend and start the workers.
557 : */
558 : static shm_mq_handle **
559 1012 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
560 : {
561 : shm_mq_handle **responseq;
562 : char *tqueuespace;
563 : int i;
564 :
565 : /* Skip this if no workers. */
566 1012 : if (pcxt->nworkers == 0)
567 0 : return NULL;
568 :
569 : /* Allocate memory for shared memory queue handles. */
570 : responseq = (shm_mq_handle **)
571 1012 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
572 :
573 : /*
574 : * If not reinitializing, allocate space from the DSM for the queues;
575 : * otherwise, find the already allocated space.
576 : */
577 1012 : if (!reinitialize)
578 : tqueuespace =
579 754 : shm_toc_allocate(pcxt->toc,
580 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
581 754 : pcxt->nworkers));
582 : else
583 258 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
584 :
585 : /* Create the queues, and become the receiver for each. */
586 3754 : for (i = 0; i < pcxt->nworkers; ++i)
587 : {
588 : shm_mq *mq;
589 :
590 2742 : mq = shm_mq_create(tqueuespace +
591 2742 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
592 : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
593 :
594 2742 : shm_mq_set_receiver(mq, MyProc);
595 2742 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
596 : }
597 :
598 : /* Add array of queues to shm_toc, so others can find it. */
599 1012 : if (!reinitialize)
600 754 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
601 :
602 : /* Return array of handles. */
603 1012 : return responseq;
604 : }
605 :
606 : /*
607 : * Sets up the required infrastructure for backend workers to perform
608 : * execution and return results to the main backend.
609 : */
610 : ParallelExecutorInfo *
611 754 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
612 : Bitmapset *sendParams, int nworkers,
613 : int64 tuples_needed)
614 : {
615 : ParallelExecutorInfo *pei;
616 : ParallelContext *pcxt;
617 : ExecParallelEstimateContext e;
618 : ExecParallelInitializeDSMContext d;
619 : FixedParallelExecutorState *fpes;
620 : char *pstmt_data;
621 : char *pstmt_space;
622 : char *paramlistinfo_space;
623 : BufferUsage *bufusage_space;
624 : WalUsage *walusage_space;
625 754 : SharedExecutorInstrumentation *instrumentation = NULL;
626 754 : SharedJitInstrumentation *jit_instrumentation = NULL;
627 : int pstmt_len;
628 : int paramlistinfo_len;
629 754 : int instrumentation_len = 0;
630 754 : int jit_instrumentation_len = 0;
631 754 : int instrument_offset = 0;
632 754 : Size dsa_minsize = dsa_minimum_size();
633 : char *query_string;
634 : int query_len;
635 :
636 : /*
637 : * Force any initplan outputs that we're going to pass to workers to be
638 : * evaluated, if they weren't already.
639 : *
640 : * For simplicity, we use the EState's per-output-tuple ExprContext here.
641 : * That risks intra-query memory leakage, since we might pass through here
642 : * many times before that ExprContext gets reset; but ExecSetParamPlan
643 : * doesn't normally leak any memory in the context (see its comments), so
644 : * it doesn't seem worth complicating this function's API to pass it a
645 : * shorter-lived ExprContext. This might need to change someday.
646 : */
647 754 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
648 :
649 : /* Allocate object for return value. */
650 754 : pei = palloc0(sizeof(ParallelExecutorInfo));
651 754 : pei->finished = false;
652 754 : pei->planstate = planstate;
653 :
654 : /* Fix up and serialize plan to be sent to workers. */
655 754 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
656 :
657 : /* Create a parallel context. */
658 754 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
659 754 : pei->pcxt = pcxt;
660 :
661 : /*
662 : * Before telling the parallel context to create a dynamic shared memory
663 : * segment, we need to figure out how big it should be. Estimate space
664 : * for the various things we need to store.
665 : */
666 :
667 : /* Estimate space for fixed-size state. */
668 754 : shm_toc_estimate_chunk(&pcxt->estimator,
669 : sizeof(FixedParallelExecutorState));
670 754 : shm_toc_estimate_keys(&pcxt->estimator, 1);
671 :
672 : /* Estimate space for query text. */
673 754 : query_len = strlen(estate->es_sourceText);
674 754 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
675 754 : shm_toc_estimate_keys(&pcxt->estimator, 1);
676 :
677 : /* Estimate space for serialized PlannedStmt. */
678 754 : pstmt_len = strlen(pstmt_data) + 1;
679 754 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
680 754 : shm_toc_estimate_keys(&pcxt->estimator, 1);
681 :
682 : /* Estimate space for serialized ParamListInfo. */
683 754 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
684 754 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
685 754 : shm_toc_estimate_keys(&pcxt->estimator, 1);
686 :
687 : /*
688 : * Estimate space for BufferUsage.
689 : *
690 : * If EXPLAIN is not in use and there are no extensions loaded that care,
691 : * we could skip this. But we have no way of knowing whether anyone's
692 : * looking at pgBufferUsage, so do it unconditionally.
693 : */
694 754 : shm_toc_estimate_chunk(&pcxt->estimator,
695 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
696 754 : shm_toc_estimate_keys(&pcxt->estimator, 1);
697 :
698 : /*
699 : * Same thing for WalUsage.
700 : */
701 754 : shm_toc_estimate_chunk(&pcxt->estimator,
702 : mul_size(sizeof(WalUsage), pcxt->nworkers));
703 754 : shm_toc_estimate_keys(&pcxt->estimator, 1);
704 :
705 : /* Estimate space for tuple queues. */
706 754 : shm_toc_estimate_chunk(&pcxt->estimator,
707 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
708 754 : shm_toc_estimate_keys(&pcxt->estimator, 1);
709 :
710 : /*
711 : * Give parallel-aware nodes a chance to add to the estimates, and get a
712 : * count of how many PlanState nodes there are.
713 : */
714 754 : e.pcxt = pcxt;
715 754 : e.nnodes = 0;
716 754 : ExecParallelEstimate(planstate, &e);
717 :
718 : /* Estimate space for instrumentation, if required. */
719 754 : if (estate->es_instrument)
720 : {
721 180 : instrumentation_len =
722 : offsetof(SharedExecutorInstrumentation, plan_node_id) +
723 180 : sizeof(int) * e.nnodes;
724 180 : instrumentation_len = MAXALIGN(instrumentation_len);
725 180 : instrument_offset = instrumentation_len;
726 180 : instrumentation_len +=
727 180 : mul_size(sizeof(Instrumentation),
728 180 : mul_size(e.nnodes, nworkers));
729 180 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
730 180 : shm_toc_estimate_keys(&pcxt->estimator, 1);
731 :
732 : /* Estimate space for JIT instrumentation, if required. */
733 180 : if (estate->es_jit_flags != PGJIT_NONE)
734 : {
735 24 : jit_instrumentation_len =
736 24 : offsetof(SharedJitInstrumentation, jit_instr) +
737 : sizeof(JitInstrumentation) * nworkers;
738 24 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
739 24 : shm_toc_estimate_keys(&pcxt->estimator, 1);
740 : }
741 : }
742 :
743 : /* Estimate space for DSA area. */
744 754 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
745 754 : shm_toc_estimate_keys(&pcxt->estimator, 1);
746 :
747 : /*
748 : * InitializeParallelDSM() passes the active snapshot to the parallel
749 : * worker, which uses it to set es_snapshot. Make sure we don't set
750 : * es_snapshot differently in the child.
751 : */
752 : Assert(GetActiveSnapshot() == estate->es_snapshot);
753 :
754 : /* Everyone's had a chance to ask for space, so now create the DSM. */
755 754 : InitializeParallelDSM(pcxt);
756 :
757 : /*
758 : * OK, now we have a dynamic shared memory segment, and it should be big
759 : * enough to store all of the data we estimated we would want to put into
760 : * it, plus whatever general stuff (not specifically executor-related) the
761 : * ParallelContext itself needs to store there. None of the space we
762 : * asked for has been allocated or initialized yet, though, so do that.
763 : */
764 :
765 : /* Store fixed-size state. */
766 754 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
767 754 : fpes->tuples_needed = tuples_needed;
768 754 : fpes->param_exec = InvalidDsaPointer;
769 754 : fpes->eflags = estate->es_top_eflags;
770 754 : fpes->jit_flags = estate->es_jit_flags;
771 754 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
772 :
773 : /* Store query string */
774 754 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
775 754 : memcpy(query_string, estate->es_sourceText, query_len + 1);
776 754 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
777 :
778 : /* Store serialized PlannedStmt. */
779 754 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
780 754 : memcpy(pstmt_space, pstmt_data, pstmt_len);
781 754 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
782 :
783 : /* Store serialized ParamListInfo. */
784 754 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
785 754 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
786 754 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
787 :
788 : /* Allocate space for each worker's BufferUsage; no need to initialize. */
789 754 : bufusage_space = shm_toc_allocate(pcxt->toc,
790 754 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
791 754 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
792 754 : pei->buffer_usage = bufusage_space;
793 :
794 : /* Same for WalUsage. */
795 754 : walusage_space = shm_toc_allocate(pcxt->toc,
796 754 : mul_size(sizeof(WalUsage), pcxt->nworkers));
797 754 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
798 754 : pei->wal_usage = walusage_space;
799 :
800 : /* Set up the tuple queues that the workers will write into. */
801 754 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
802 :
803 : /* We don't need the TupleQueueReaders yet, though. */
804 754 : pei->reader = NULL;
805 :
806 : /*
807 : * If instrumentation options were supplied, allocate space for the data.
808 : * It only gets partially initialized here; the rest happens during
809 : * ExecParallelInitializeDSM.
810 : */
811 754 : if (estate->es_instrument)
812 : {
813 : Instrumentation *instrument;
814 : int i;
815 :
816 180 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
817 180 : instrumentation->instrument_options = estate->es_instrument;
818 180 : instrumentation->instrument_offset = instrument_offset;
819 180 : instrumentation->num_workers = nworkers;
820 180 : instrumentation->num_plan_nodes = e.nnodes;
821 180 : instrument = GetInstrumentationArray(instrumentation);
822 1860 : for (i = 0; i < nworkers * e.nnodes; ++i)
823 1680 : InstrInit(&instrument[i], estate->es_instrument);
824 180 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
825 : instrumentation);
826 180 : pei->instrumentation = instrumentation;
827 :
828 180 : if (estate->es_jit_flags != PGJIT_NONE)
829 : {
830 24 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
831 : jit_instrumentation_len);
832 24 : jit_instrumentation->num_workers = nworkers;
833 24 : memset(jit_instrumentation->jit_instr, 0,
834 : sizeof(JitInstrumentation) * nworkers);
835 24 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
836 : jit_instrumentation);
837 24 : pei->jit_instrumentation = jit_instrumentation;
838 : }
839 : }
840 :
841 : /*
842 : * Create a DSA area that can be used by the leader and all workers.
843 : * (However, if we failed to create a DSM and are using private memory
844 : * instead, then skip this.)
845 : */
846 754 : if (pcxt->seg != NULL)
847 : {
848 : char *area_space;
849 :
850 754 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
851 754 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
852 754 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
853 : LWTRANCHE_PARALLEL_QUERY_DSA,
854 : pcxt->seg);
855 :
856 : /*
857 : * Serialize parameters, if any, using DSA storage. We don't dare use
858 : * the main parallel query DSM for this because we might relaunch
859 : * workers after the values have changed (and thus the amount of
860 : * storage required has changed).
861 : */
862 754 : if (!bms_is_empty(sendParams))
863 : {
864 24 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
865 : pei->area);
866 24 : fpes->param_exec = pei->param_exec;
867 : }
868 : }
869 :
870 : /*
871 : * Give parallel-aware nodes a chance to initialize their shared data.
872 : * This also initializes the elements of instrumentation->ps_instrument,
873 : * if it exists.
874 : */
875 754 : d.pcxt = pcxt;
876 754 : d.instrumentation = instrumentation;
877 754 : d.nnodes = 0;
878 :
879 : /* Install our DSA area while initializing the plan. */
880 754 : estate->es_query_dsa = pei->area;
881 754 : ExecParallelInitializeDSM(planstate, &d);
882 754 : estate->es_query_dsa = NULL;
883 :
884 : /*
885 : * Make sure that the world hasn't shifted under our feet. This could
886 : * probably just be an Assert(), but let's be conservative for now.
887 : */
888 754 : if (e.nnodes != d.nnodes)
889 0 : elog(ERROR, "inconsistent count of PlanState nodes");
890 :
891 : /* OK, we're ready to rock and roll. */
892 754 : return pei;
893 : }
894 :
895 : /*
896 : * Set up tuple queue readers to read the results of a parallel subplan.
897 : *
898 : * This is separate from ExecInitParallelPlan() because we can launch the
899 : * worker processes and let them start doing something before we do this.
900 : */
901 : void
902 994 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
903 : {
904 994 : int nworkers = pei->pcxt->nworkers_launched;
905 : int i;
906 :
907 : Assert(pei->reader == NULL);
908 :
909 994 : if (nworkers > 0)
910 : {
911 994 : pei->reader = (TupleQueueReader **)
912 994 : palloc(nworkers * sizeof(TupleQueueReader *));
913 :
914 3656 : for (i = 0; i < nworkers; i++)
915 : {
916 2662 : shm_mq_set_handle(pei->tqueue[i],
917 2662 : pei->pcxt->worker[i].bgwhandle);
918 2662 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
919 : }
920 : }
921 994 : }
922 :
923 : /*
924 : * Re-initialize the parallel executor shared memory state before launching
925 : * a fresh batch of workers.
926 : */
927 : void
928 258 : ExecParallelReinitialize(PlanState *planstate,
929 : ParallelExecutorInfo *pei,
930 : Bitmapset *sendParams)
931 : {
932 258 : EState *estate = planstate->state;
933 : FixedParallelExecutorState *fpes;
934 :
935 : /* Old workers must already be shut down */
936 : Assert(pei->finished);
937 :
938 : /*
939 : * Force any initplan outputs that we're going to pass to workers to be
940 : * evaluated, if they weren't already (see comments in
941 : * ExecInitParallelPlan).
942 : */
943 258 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
944 :
945 258 : ReinitializeParallelDSM(pei->pcxt);
946 258 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
947 258 : pei->reader = NULL;
948 258 : pei->finished = false;
949 :
950 258 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
951 :
952 : /* Free any serialized parameters from the last round. */
953 258 : if (DsaPointerIsValid(fpes->param_exec))
954 : {
955 0 : dsa_free(pei->area, fpes->param_exec);
956 0 : fpes->param_exec = InvalidDsaPointer;
957 : }
958 :
959 : /* Serialize current parameter values if required. */
960 258 : if (!bms_is_empty(sendParams))
961 : {
962 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
963 : pei->area);
964 0 : fpes->param_exec = pei->param_exec;
965 : }
966 :
967 : /* Traverse plan tree and let each child node reset associated state. */
968 258 : estate->es_query_dsa = pei->area;
969 258 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
970 258 : estate->es_query_dsa = NULL;
971 258 : }
972 :
973 : /*
974 : * Traverse plan tree to reinitialize per-node dynamic shared memory state
975 : */
976 : static bool
977 666 : ExecParallelReInitializeDSM(PlanState *planstate,
978 : ParallelContext *pcxt)
979 : {
980 666 : if (planstate == NULL)
981 0 : return false;
982 :
983 : /*
984 : * Call reinitializers for DSM-using plan nodes.
985 : */
986 666 : switch (nodeTag(planstate))
987 : {
988 276 : case T_SeqScanState:
989 276 : if (planstate->plan->parallel_aware)
990 228 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
991 : pcxt);
992 276 : break;
993 12 : case T_IndexScanState:
994 12 : if (planstate->plan->parallel_aware)
995 12 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
996 : pcxt);
997 12 : break;
998 12 : case T_IndexOnlyScanState:
999 12 : if (planstate->plan->parallel_aware)
1000 12 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
1001 : pcxt);
1002 12 : break;
1003 0 : case T_ForeignScanState:
1004 0 : if (planstate->plan->parallel_aware)
1005 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
1006 : pcxt);
1007 0 : break;
1008 0 : case T_TidRangeScanState:
1009 0 : if (planstate->plan->parallel_aware)
1010 0 : ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1011 : pcxt);
1012 0 : break;
1013 0 : case T_AppendState:
1014 0 : if (planstate->plan->parallel_aware)
1015 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1016 0 : break;
1017 0 : case T_CustomScanState:
1018 0 : if (planstate->plan->parallel_aware)
1019 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1020 : pcxt);
1021 0 : break;
1022 54 : case T_BitmapHeapScanState:
1023 54 : if (planstate->plan->parallel_aware)
1024 54 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1025 : pcxt);
1026 54 : break;
1027 96 : case T_HashJoinState:
1028 96 : if (planstate->plan->parallel_aware)
1029 48 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1030 : pcxt);
1031 96 : break;
1032 180 : case T_BitmapIndexScanState:
1033 : case T_HashState:
1034 : case T_SortState:
1035 : case T_IncrementalSortState:
1036 : case T_MemoizeState:
1037 : /* these nodes have DSM state, but no reinitialization is required */
1038 180 : break;
1039 :
1040 36 : default:
1041 36 : break;
1042 : }
1043 :
1044 666 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1045 : }
1046 :
1047 : /*
1048 : * Copy instrumentation information about this node and its descendants from
1049 : * dynamic shared memory.
1050 : */
1051 : static bool
1052 1026 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1053 : SharedExecutorInstrumentation *instrumentation)
1054 : {
1055 : Instrumentation *instrument;
1056 : int i;
1057 : int n;
1058 : int ibytes;
1059 1026 : int plan_node_id = planstate->plan->plan_node_id;
1060 : MemoryContext oldcontext;
1061 :
1062 : /* Find the instrumentation for this node. */
1063 4638 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1064 4638 : if (instrumentation->plan_node_id[i] == plan_node_id)
1065 1026 : break;
1066 1026 : if (i >= instrumentation->num_plan_nodes)
1067 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1068 :
1069 : /* Accumulate the statistics from all workers. */
1070 1026 : instrument = GetInstrumentationArray(instrumentation);
1071 1026 : instrument += i * instrumentation->num_workers;
1072 2706 : for (n = 0; n < instrumentation->num_workers; ++n)
1073 1680 : InstrAggNode(planstate->instrument, &instrument[n]);
1074 :
1075 : /*
1076 : * Also store the per-worker detail.
1077 : *
1078 : * Worker instrumentation should be allocated in the same context as the
1079 : * regular instrumentation information, which is the per-query context.
1080 : * Switch into per-query memory context.
1081 : */
1082 1026 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1083 1026 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1084 1026 : planstate->worker_instrument =
1085 1026 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1086 1026 : MemoryContextSwitchTo(oldcontext);
1087 :
1088 1026 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
1089 1026 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1090 :
1091 : /* Perform any node-type-specific work that needs to be done. */
1092 1026 : switch (nodeTag(planstate))
1093 : {
1094 270 : case T_IndexScanState:
1095 270 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1096 270 : break;
1097 0 : case T_IndexOnlyScanState:
1098 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1099 0 : break;
1100 0 : case T_BitmapIndexScanState:
1101 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1102 0 : break;
1103 12 : case T_SortState:
1104 12 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1105 12 : break;
1106 0 : case T_IncrementalSortState:
1107 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1108 0 : break;
1109 84 : case T_HashState:
1110 84 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1111 84 : break;
1112 102 : case T_AggState:
1113 102 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1114 102 : break;
1115 0 : case T_MemoizeState:
1116 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1117 0 : break;
1118 0 : case T_BitmapHeapScanState:
1119 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1120 0 : break;
1121 558 : default:
1122 558 : break;
1123 : }
1124 :
1125 1026 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1126 : instrumentation);
1127 : }
1128 :
1129 : /*
1130 : * Add up the workers' JIT instrumentation from dynamic shared memory.
1131 : */
1132 : static void
1133 24 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1134 : SharedJitInstrumentation *shared_jit)
1135 : {
1136 : JitInstrumentation *combined;
1137 : int ibytes;
1138 :
1139 : int n;
1140 :
1141 : /*
1142 : * Accumulate worker JIT instrumentation into the combined JIT
1143 : * instrumentation, allocating it if required.
1144 : */
1145 24 : if (!planstate->state->es_jit_worker_instr)
1146 24 : planstate->state->es_jit_worker_instr =
1147 24 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1148 24 : combined = planstate->state->es_jit_worker_instr;
1149 :
1150 : /* Accumulate all the workers' instrumentations. */
1151 72 : for (n = 0; n < shared_jit->num_workers; ++n)
1152 48 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1153 :
1154 : /*
1155 : * Store the per-worker detail.
1156 : *
1157 : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1158 : * instrumentation in per-query context.
1159 : */
1160 24 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1161 24 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1162 24 : planstate->worker_jit_instrument =
1163 24 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1164 :
1165 24 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1166 24 : }
1167 :
1168 : /*
1169 : * Finish parallel execution. We wait for parallel workers to finish, and
1170 : * accumulate their buffer/WAL usage.
1171 : */
1172 : void
1173 1826 : ExecParallelFinish(ParallelExecutorInfo *pei)
1174 : {
1175 1826 : int nworkers = pei->pcxt->nworkers_launched;
1176 : int i;
1177 :
1178 : /* Make this be a no-op if called twice in a row. */
1179 1826 : if (pei->finished)
1180 826 : return;
1181 :
1182 : /*
1183 : * Detach from tuple queues ASAP, so that any still-active workers will
1184 : * notice that no further results are wanted.
1185 : */
1186 1000 : if (pei->tqueue != NULL)
1187 : {
1188 3650 : for (i = 0; i < nworkers; i++)
1189 2650 : shm_mq_detach(pei->tqueue[i]);
1190 1000 : pfree(pei->tqueue);
1191 1000 : pei->tqueue = NULL;
1192 : }
1193 :
1194 : /*
1195 : * While we're waiting for the workers to finish, let's get rid of the
1196 : * tuple queue readers. (Any other local cleanup could be done here too.)
1197 : */
1198 1000 : if (pei->reader != NULL)
1199 : {
1200 3632 : for (i = 0; i < nworkers; i++)
1201 2650 : DestroyTupleQueueReader(pei->reader[i]);
1202 982 : pfree(pei->reader);
1203 982 : pei->reader = NULL;
1204 : }
1205 :
1206 : /* Now wait for the workers to finish. */
1207 1000 : WaitForParallelWorkersToFinish(pei->pcxt);
1208 :
1209 : /*
1210 : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1211 : * finish, or we might get incomplete data.)
1212 : */
1213 3650 : for (i = 0; i < nworkers; i++)
1214 2650 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1215 :
1216 1000 : pei->finished = true;
1217 : }
1218 :
1219 : /*
1220 : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1221 : * resources still exist after ExecParallelFinish. We separate these
1222 : * routines because someone might want to examine the contents of the DSM
1223 : * after ExecParallelFinish and before calling this routine.
1224 : */
1225 : void
1226 742 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1227 : {
1228 : /* Accumulate instrumentation, if any. */
1229 742 : if (pei->instrumentation)
1230 180 : ExecParallelRetrieveInstrumentation(pei->planstate,
1231 : pei->instrumentation);
1232 :
1233 : /* Accumulate JIT instrumentation, if any. */
1234 742 : if (pei->jit_instrumentation)
1235 24 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1236 24 : pei->jit_instrumentation);
1237 :
1238 : /* Free any serialized parameters. */
1239 742 : if (DsaPointerIsValid(pei->param_exec))
1240 : {
1241 24 : dsa_free(pei->area, pei->param_exec);
1242 24 : pei->param_exec = InvalidDsaPointer;
1243 : }
1244 742 : if (pei->area != NULL)
1245 : {
1246 742 : dsa_detach(pei->area);
1247 742 : pei->area = NULL;
1248 : }
1249 742 : if (pei->pcxt != NULL)
1250 : {
1251 742 : DestroyParallelContext(pei->pcxt);
1252 742 : pei->pcxt = NULL;
1253 : }
1254 742 : pfree(pei);
1255 742 : }
1256 :
1257 : /*
1258 : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1259 : * for that purpose.
1260 : */
1261 : static DestReceiver *
1262 2662 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1263 : {
1264 : char *mqspace;
1265 : shm_mq *mq;
1266 :
1267 2662 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1268 2662 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1269 2662 : mq = (shm_mq *) mqspace;
1270 2662 : shm_mq_set_sender(mq, MyProc);
1271 2662 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1272 : }
1273 :
1274 : /*
1275 : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1276 : */
1277 : static QueryDesc *
1278 2662 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1279 : int instrument_options)
1280 : {
1281 : char *pstmtspace;
1282 : char *paramspace;
1283 : PlannedStmt *pstmt;
1284 : ParamListInfo paramLI;
1285 : char *queryString;
1286 :
1287 : /* Get the query string from shared memory */
1288 2662 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1289 :
1290 : /* Reconstruct leader-supplied PlannedStmt. */
1291 2662 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1292 2662 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1293 :
1294 : /* Reconstruct ParamListInfo. */
1295 2662 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1296 2662 : paramLI = RestoreParamList(¶mspace);
1297 :
1298 : /* Create a QueryDesc for the query. */
1299 2662 : return CreateQueryDesc(pstmt,
1300 : queryString,
1301 : GetActiveSnapshot(), InvalidSnapshot,
1302 : receiver, paramLI, NULL, instrument_options);
1303 : }
1304 :
1305 : /*
1306 : * Copy instrumentation information from this node and its descendants into
1307 : * dynamic shared memory, so that the parallel leader can retrieve it.
1308 : */
1309 : static bool
1310 2376 : ExecParallelReportInstrumentation(PlanState *planstate,
1311 : SharedExecutorInstrumentation *instrumentation)
1312 : {
1313 : int i;
1314 2376 : int plan_node_id = planstate->plan->plan_node_id;
1315 : Instrumentation *instrument;
1316 :
1317 2376 : InstrEndLoop(planstate->instrument);
1318 :
1319 : /*
1320 : * If we shuffled the plan_node_id values in ps_instrument into sorted
1321 : * order, we could use binary search here. This might matter someday if
1322 : * we're pushing down sufficiently large plan trees. For now, do it the
1323 : * slow, dumb way.
1324 : */
1325 7812 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1326 7812 : if (instrumentation->plan_node_id[i] == plan_node_id)
1327 2376 : break;
1328 2376 : if (i >= instrumentation->num_plan_nodes)
1329 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1330 :
1331 : /*
1332 : * Add our statistics to the per-node, per-worker totals. It's possible
1333 : * that this could happen more than once if we relaunched workers.
1334 : */
1335 2376 : instrument = GetInstrumentationArray(instrumentation);
1336 2376 : instrument += i * instrumentation->num_workers;
1337 : Assert(IsParallelWorker());
1338 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1339 2376 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1340 :
1341 2376 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1342 : instrumentation);
1343 : }
1344 :
1345 : /*
1346 : * Initialize the PlanState and its descendants with the information
1347 : * retrieved from shared memory. This has to be done once the PlanState
1348 : * is allocated and initialized by executor; that is, after ExecutorStart().
1349 : */
1350 : static bool
1351 8448 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1352 : {
1353 8448 : if (planstate == NULL)
1354 0 : return false;
1355 :
1356 8448 : switch (nodeTag(planstate))
1357 : {
1358 3366 : case T_SeqScanState:
1359 3366 : if (planstate->plan->parallel_aware)
1360 2738 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1361 3366 : break;
1362 396 : case T_IndexScanState:
1363 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1364 396 : ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
1365 396 : break;
1366 242 : case T_IndexOnlyScanState:
1367 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1368 242 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1369 : pwcxt);
1370 242 : break;
1371 272 : case T_BitmapIndexScanState:
1372 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1373 272 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1374 : pwcxt);
1375 272 : break;
1376 0 : case T_ForeignScanState:
1377 0 : if (planstate->plan->parallel_aware)
1378 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1379 : pwcxt);
1380 0 : break;
1381 96 : case T_TidRangeScanState:
1382 96 : if (planstate->plan->parallel_aware)
1383 96 : ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1384 : pwcxt);
1385 96 : break;
1386 378 : case T_AppendState:
1387 378 : if (planstate->plan->parallel_aware)
1388 318 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1389 378 : break;
1390 0 : case T_CustomScanState:
1391 0 : if (planstate->plan->parallel_aware)
1392 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1393 : pwcxt);
1394 0 : break;
1395 272 : case T_BitmapHeapScanState:
1396 272 : if (planstate->plan->parallel_aware)
1397 270 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1398 : pwcxt);
1399 272 : break;
1400 558 : case T_HashJoinState:
1401 558 : if (planstate->plan->parallel_aware)
1402 318 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1403 : pwcxt);
1404 558 : break;
1405 558 : case T_HashState:
1406 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1407 558 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1408 558 : break;
1409 464 : case T_SortState:
1410 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1411 464 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1412 464 : break;
1413 0 : case T_IncrementalSortState:
1414 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1415 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1416 : pwcxt);
1417 0 : break;
1418 1656 : case T_AggState:
1419 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1420 1656 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1421 1656 : break;
1422 12 : case T_MemoizeState:
1423 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1424 12 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1425 12 : break;
1426 178 : default:
1427 178 : break;
1428 : }
1429 :
1430 8448 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1431 : pwcxt);
1432 : }
1433 :
1434 : /*
1435 : * Main entrypoint for parallel query worker processes.
1436 : *
1437 : * We reach this function from ParallelWorkerMain, so the setup necessary to
1438 : * create a sensible parallel environment has already been done;
1439 : * ParallelWorkerMain worries about stuff like the transaction state, combo
1440 : * CID mappings, and GUC values, so we don't need to deal with any of that
1441 : * here.
1442 : *
1443 : * Our job is to deal with concerns specific to the executor. The parallel
1444 : * group leader will have stored a serialized PlannedStmt, and it's our job
1445 : * to execute that plan and write the resulting tuples to the appropriate
1446 : * tuple queue. Various bits of supporting information that we need in order
1447 : * to do this are also stored in the dsm_segment and can be accessed through
1448 : * the shm_toc.
1449 : */
1450 : void
1451 2662 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1452 : {
1453 : FixedParallelExecutorState *fpes;
1454 : BufferUsage *buffer_usage;
1455 : WalUsage *wal_usage;
1456 : DestReceiver *receiver;
1457 : QueryDesc *queryDesc;
1458 : SharedExecutorInstrumentation *instrumentation;
1459 : SharedJitInstrumentation *jit_instrumentation;
1460 2662 : int instrument_options = 0;
1461 : void *area_space;
1462 : dsa_area *area;
1463 : ParallelWorkerContext pwcxt;
1464 :
1465 : /* Get fixed-size state. */
1466 2662 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1467 :
1468 : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1469 2662 : receiver = ExecParallelGetReceiver(seg, toc);
1470 2662 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1471 2662 : if (instrumentation != NULL)
1472 726 : instrument_options = instrumentation->instrument_options;
1473 2662 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1474 : true);
1475 2662 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1476 :
1477 : /* Setting debug_query_string for individual workers */
1478 2662 : debug_query_string = queryDesc->sourceText;
1479 :
1480 : /* Report workers' query for monitoring purposes */
1481 2662 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1482 :
1483 : /* Attach to the dynamic shared memory area. */
1484 2662 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1485 2662 : area = dsa_attach_in_place(area_space, seg);
1486 :
1487 : /* Start up the executor */
1488 2662 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1489 2662 : ExecutorStart(queryDesc, fpes->eflags);
1490 :
1491 : /* Special executor initialization steps for parallel workers */
1492 2662 : queryDesc->planstate->state->es_query_dsa = area;
1493 2662 : if (DsaPointerIsValid(fpes->param_exec))
1494 : {
1495 : char *paramexec_space;
1496 :
1497 72 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1498 72 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1499 : }
1500 2662 : pwcxt.toc = toc;
1501 2662 : pwcxt.seg = seg;
1502 2662 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1503 :
1504 : /* Pass down any tuple bound */
1505 2662 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1506 :
1507 : /*
1508 : * Prepare to track buffer/WAL usage during query execution.
1509 : *
1510 : * We do this after starting up the executor to match what happens in the
1511 : * leader, which also doesn't count buffer accesses and WAL activity that
1512 : * occur during executor startup.
1513 : */
1514 2662 : InstrStartParallelQuery();
1515 :
1516 : /*
1517 : * Run the plan. If we specified a tuple bound, be careful not to demand
1518 : * more tuples than that.
1519 : */
1520 2662 : ExecutorRun(queryDesc,
1521 : ForwardScanDirection,
1522 2662 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1523 :
1524 : /* Shut down the executor */
1525 2650 : ExecutorFinish(queryDesc);
1526 :
1527 : /* Report buffer/WAL usage during parallel execution. */
1528 2650 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1529 2650 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1530 2650 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1531 2650 : &wal_usage[ParallelWorkerNumber]);
1532 :
1533 : /* Report instrumentation data if any instrumentation options are set. */
1534 2650 : if (instrumentation != NULL)
1535 726 : ExecParallelReportInstrumentation(queryDesc->planstate,
1536 : instrumentation);
1537 :
1538 : /* Report JIT instrumentation data if any */
1539 2650 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1540 : {
1541 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1542 144 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1543 144 : queryDesc->estate->es_jit->instr;
1544 : }
1545 :
1546 : /* Must do this after capturing instrumentation. */
1547 2650 : ExecutorEnd(queryDesc);
1548 :
1549 : /* Cleanup. */
1550 2650 : dsa_detach(area);
1551 2650 : FreeQueryDesc(queryDesc);
1552 2650 : receiver->rDestroy(receiver);
1553 2650 : }
|