Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execParallel.c
4 : * Support routines for parallel execution.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * This file contains routines that are intended to support setting up,
10 : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : * executor. The ParallelContext machinery will handle starting the
12 : * workers and ensuring that their state generally matches that of the
13 : * leader; see src/backend/access/transam/README.parallel for details.
14 : * However, we must save and restore relevant executor state, such as
15 : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : * the actual plan to be passed down to the worker.
17 : *
18 : * IDENTIFICATION
19 : * src/backend/executor/execParallel.c
20 : *
21 : *-------------------------------------------------------------------------
22 : */
23 :
24 : #include "postgres.h"
25 :
26 : #include "executor/execParallel.h"
27 : #include "executor/executor.h"
28 : #include "executor/nodeAgg.h"
29 : #include "executor/nodeAppend.h"
30 : #include "executor/nodeBitmapHeapscan.h"
31 : #include "executor/nodeBitmapIndexscan.h"
32 : #include "executor/nodeCustom.h"
33 : #include "executor/nodeForeignscan.h"
34 : #include "executor/nodeHash.h"
35 : #include "executor/nodeHashjoin.h"
36 : #include "executor/nodeIncrementalSort.h"
37 : #include "executor/nodeIndexonlyscan.h"
38 : #include "executor/nodeIndexscan.h"
39 : #include "executor/nodeMemoize.h"
40 : #include "executor/nodeSeqscan.h"
41 : #include "executor/nodeSort.h"
42 : #include "executor/nodeSubplan.h"
43 : #include "executor/nodeTidrangescan.h"
44 : #include "executor/tqueue.h"
45 : #include "jit/jit.h"
46 : #include "nodes/nodeFuncs.h"
47 : #include "pgstat.h"
48 : #include "storage/proc.h"
49 : #include "tcop/tcopprot.h"
50 : #include "utils/datum.h"
51 : #include "utils/dsa.h"
52 : #include "utils/lsyscache.h"
53 : #include "utils/snapmgr.h"
54 :
55 : /*
56 : * Magic numbers for parallel executor communication. We use constants
57 : * greater than any 32-bit integer here so that values < 2^32 can be used
58 : * by individual parallel nodes to store their own state.
59 : */
60 : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
61 : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
62 : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
63 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
64 : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
65 : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
66 : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
67 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
68 : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
69 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
70 :
71 : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
72 :
73 : /*
74 : * Fixed-size random stuff that we need to pass to parallel workers.
75 : */
76 : typedef struct FixedParallelExecutorState
77 : {
78 : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
79 : dsa_pointer param_exec;
80 : int eflags;
81 : int jit_flags;
82 : } FixedParallelExecutorState;
83 :
84 : /*
85 : * DSM structure for accumulating per-PlanState instrumentation.
86 : *
87 : * instrument_options: Same meaning here as in instrument.c.
88 : *
89 : * instrument_offset: Offset, relative to the start of this structure,
90 : * of the first NodeInstrumentation object. This will depend on the length of
91 : * the plan_node_id array.
92 : *
93 : * num_workers: Number of workers.
94 : *
95 : * num_plan_nodes: Number of plan nodes.
96 : *
97 : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
98 : * from parallel workers. The length of this array is given by num_plan_nodes.
99 : */
100 : struct SharedExecutorInstrumentation
101 : {
102 : int instrument_options;
103 : int instrument_offset;
104 : int num_workers;
105 : int num_plan_nodes;
106 : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
107 :
108 : /*
109 : * Array of num_plan_nodes * num_workers NodeInstrumentation objects
110 : * follows.
111 : */
112 : };
113 : #define GetInstrumentationArray(sei) \
114 : (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
115 : (NodeInstrumentation *) (((char *) sei) + sei->instrument_offset))
116 :
117 : /* Context object for ExecParallelEstimate. */
118 : typedef struct ExecParallelEstimateContext
119 : {
120 : ParallelContext *pcxt;
121 : int nnodes;
122 : } ExecParallelEstimateContext;
123 :
124 : /* Context object for ExecParallelInitializeDSM. */
125 : typedef struct ExecParallelInitializeDSMContext
126 : {
127 : ParallelContext *pcxt;
128 : SharedExecutorInstrumentation *instrumentation;
129 : int nnodes;
130 : } ExecParallelInitializeDSMContext;
131 :
132 : /* Helper functions that run in the parallel leader. */
133 : static char *ExecSerializePlan(Plan *plan, EState *estate);
134 : static bool ExecParallelEstimate(PlanState *planstate,
135 : ExecParallelEstimateContext *e);
136 : static bool ExecParallelInitializeDSM(PlanState *planstate,
137 : ExecParallelInitializeDSMContext *d);
138 : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
139 : bool reinitialize);
140 : static bool ExecParallelReInitializeDSM(PlanState *planstate,
141 : ParallelContext *pcxt);
142 : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
143 : SharedExecutorInstrumentation *instrumentation);
144 :
145 : /* Helper function that runs in the parallel worker. */
146 : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
147 :
148 : /*
149 : * Create a serialized representation of the plan to be sent to each worker.
150 : */
151 : static char *
152 523 : ExecSerializePlan(Plan *plan, EState *estate)
153 : {
154 : PlannedStmt *pstmt;
155 : ListCell *lc;
156 :
157 : /* We can't scribble on the original plan, so make a copy. */
158 523 : plan = copyObject(plan);
159 :
160 : /*
161 : * The worker will start its own copy of the executor, and that copy will
162 : * insert a junk filter if the toplevel node has any resjunk entries. We
163 : * don't want that to happen, because while resjunk columns shouldn't be
164 : * sent back to the user, here the tuples are coming back to another
165 : * backend which may very well need them. So mutate the target list
166 : * accordingly. This is sort of a hack; there might be better ways to do
167 : * this...
168 : */
169 1439 : foreach(lc, plan->targetlist)
170 : {
171 916 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
172 :
173 916 : tle->resjunk = false;
174 : }
175 :
176 : /*
177 : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
178 : * for our purposes, but the worker will need at least a minimal
179 : * PlannedStmt to start the executor.
180 : */
181 523 : pstmt = makeNode(PlannedStmt);
182 523 : pstmt->commandType = CMD_SELECT;
183 523 : pstmt->queryId = pgstat_get_my_query_id();
184 523 : pstmt->planId = pgstat_get_my_plan_id();
185 523 : pstmt->hasReturning = false;
186 523 : pstmt->hasModifyingCTE = false;
187 523 : pstmt->canSetTag = true;
188 523 : pstmt->transientPlan = false;
189 523 : pstmt->dependsOnRole = false;
190 523 : pstmt->parallelModeNeeded = false;
191 523 : pstmt->planTree = plan;
192 523 : pstmt->partPruneInfos = estate->es_part_prune_infos;
193 523 : pstmt->rtable = estate->es_range_table;
194 523 : pstmt->unprunableRelids = estate->es_unpruned_relids;
195 523 : pstmt->permInfos = estate->es_rteperminfos;
196 523 : pstmt->appendRelations = NIL;
197 523 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
198 :
199 : /*
200 : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
201 : * for unsafe ones (so that the list indexes of the safe ones are
202 : * preserved). This positively ensures that the worker won't try to run,
203 : * or even do ExecInitNode on, an unsafe subplan. That's important to
204 : * protect, eg, non-parallel-aware FDWs from getting into trouble.
205 : */
206 523 : pstmt->subplans = NIL;
207 559 : foreach(lc, estate->es_plannedstmt->subplans)
208 : {
209 36 : Plan *subplan = (Plan *) lfirst(lc);
210 :
211 36 : if (subplan && !subplan->parallel_safe)
212 8 : subplan = NULL;
213 36 : pstmt->subplans = lappend(pstmt->subplans, subplan);
214 : }
215 :
216 523 : pstmt->rewindPlanIDs = NULL;
217 523 : pstmt->rowMarks = NIL;
218 :
219 : /*
220 : * Pass the row mark and result relation relids to parallel workers. They
221 : * may need to check them to inform heuristics.
222 : */
223 523 : pstmt->rowMarkRelids = estate->es_plannedstmt->rowMarkRelids;
224 523 : pstmt->resultRelationRelids = estate->es_plannedstmt->resultRelationRelids;
225 523 : pstmt->relationOids = NIL;
226 523 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
227 523 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
228 523 : pstmt->utilityStmt = NULL;
229 523 : pstmt->stmt_location = -1;
230 523 : pstmt->stmt_len = -1;
231 :
232 : /* Return serialized copy of our dummy PlannedStmt. */
233 523 : return nodeToString(pstmt);
234 : }
235 :
236 : /*
237 : * Parallel-aware plan nodes (and occasionally others) may need some state
238 : * which is shared across all parallel workers. Before we size the DSM, give
239 : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
240 : * &pcxt->estimator.
241 : *
242 : * While we're at it, count the number of PlanState nodes in the tree, so
243 : * we know how many Instrumentation structures we need.
244 : */
245 : static bool
246 3303 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
247 : {
248 3303 : if (planstate == NULL)
249 0 : return false;
250 :
251 : /* Count this node. */
252 3303 : e->nnodes++;
253 :
254 3303 : switch (nodeTag(planstate))
255 : {
256 1528 : case T_SeqScanState:
257 1528 : if (planstate->plan->parallel_aware)
258 1190 : ExecSeqScanEstimate((SeqScanState *) planstate,
259 : e->pcxt);
260 1528 : break;
261 276 : case T_IndexScanState:
262 276 : if (planstate->plan->parallel_aware)
263 12 : ExecIndexScanEstimate((IndexScanState *) planstate,
264 : e->pcxt);
265 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
266 276 : ExecIndexScanInstrumentEstimate((IndexScanState *) planstate,
267 : e->pcxt);
268 276 : break;
269 42 : case T_IndexOnlyScanState:
270 42 : if (planstate->plan->parallel_aware)
271 30 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
272 : e->pcxt);
273 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
274 42 : ExecIndexOnlyScanInstrumentEstimate((IndexOnlyScanState *) planstate,
275 : e->pcxt);
276 42 : break;
277 13 : case T_BitmapIndexScanState:
278 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
279 13 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
280 : e->pcxt);
281 13 : break;
282 0 : case T_ForeignScanState:
283 0 : if (planstate->plan->parallel_aware)
284 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
285 : e->pcxt);
286 0 : break;
287 16 : case T_TidRangeScanState:
288 16 : if (planstate->plan->parallel_aware)
289 16 : ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
290 : e->pcxt);
291 16 : break;
292 148 : case T_AppendState:
293 148 : if (planstate->plan->parallel_aware)
294 108 : ExecAppendEstimate((AppendState *) planstate,
295 : e->pcxt);
296 148 : break;
297 0 : case T_CustomScanState:
298 0 : if (planstate->plan->parallel_aware)
299 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
300 : e->pcxt);
301 0 : break;
302 13 : case T_BitmapHeapScanState:
303 13 : if (planstate->plan->parallel_aware)
304 12 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
305 : e->pcxt);
306 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
307 13 : ExecBitmapHeapInstrumentEstimate((BitmapHeapScanState *) planstate,
308 : e->pcxt);
309 13 : break;
310 208 : case T_HashJoinState:
311 208 : if (planstate->plan->parallel_aware)
312 84 : ExecHashJoinEstimate((HashJoinState *) planstate,
313 : e->pcxt);
314 208 : break;
315 208 : case T_HashState:
316 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
317 208 : ExecHashEstimate((HashState *) planstate, e->pcxt);
318 208 : break;
319 201 : case T_SortState:
320 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
321 201 : ExecSortEstimate((SortState *) planstate, e->pcxt);
322 201 : break;
323 0 : case T_IncrementalSortState:
324 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
325 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
326 0 : break;
327 396 : case T_AggState:
328 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
329 396 : ExecAggEstimate((AggState *) planstate, e->pcxt);
330 396 : break;
331 4 : case T_MemoizeState:
332 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
333 4 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
334 4 : break;
335 250 : default:
336 250 : break;
337 : }
338 :
339 3303 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
340 : }
341 :
342 : /*
343 : * Estimate the amount of space required to serialize the indicated parameters.
344 : */
345 : static Size
346 16 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
347 : {
348 : int paramid;
349 16 : Size sz = sizeof(int);
350 :
351 16 : paramid = -1;
352 36 : while ((paramid = bms_next_member(params, paramid)) >= 0)
353 : {
354 : Oid typeOid;
355 : int16 typLen;
356 : bool typByVal;
357 : ParamExecData *prm;
358 :
359 20 : prm = &(estate->es_param_exec_vals[paramid]);
360 20 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
361 : paramid);
362 :
363 20 : sz = add_size(sz, sizeof(int)); /* space for paramid */
364 :
365 : /* space for datum/isnull */
366 20 : if (OidIsValid(typeOid))
367 20 : get_typlenbyval(typeOid, &typLen, &typByVal);
368 : else
369 : {
370 : /* If no type OID, assume by-value, like copyParamList does. */
371 0 : typLen = sizeof(Datum);
372 0 : typByVal = true;
373 : }
374 20 : sz = add_size(sz,
375 20 : datumEstimateSpace(prm->value, prm->isnull,
376 : typByVal, typLen));
377 : }
378 16 : return sz;
379 : }
380 :
381 : /*
382 : * Serialize specified PARAM_EXEC parameters.
383 : *
384 : * We write the number of parameters first, as a 4-byte integer, and then
385 : * write details for each parameter in turn. The details for each parameter
386 : * consist of a 4-byte paramid (location of param in execution time internal
387 : * parameter array) and then the datum as serialized by datumSerialize().
388 : */
389 : static dsa_pointer
390 16 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
391 : {
392 : Size size;
393 : int nparams;
394 : int paramid;
395 : ParamExecData *prm;
396 : dsa_pointer handle;
397 : char *start_address;
398 :
399 : /* Allocate enough space for the current parameter values. */
400 16 : size = EstimateParamExecSpace(estate, params);
401 16 : handle = dsa_allocate(area, size);
402 16 : start_address = dsa_get_address(area, handle);
403 :
404 : /* First write the number of parameters as a 4-byte integer. */
405 16 : nparams = bms_num_members(params);
406 16 : memcpy(start_address, &nparams, sizeof(int));
407 16 : start_address += sizeof(int);
408 :
409 : /* Write details for each parameter in turn. */
410 16 : paramid = -1;
411 36 : while ((paramid = bms_next_member(params, paramid)) >= 0)
412 : {
413 : Oid typeOid;
414 : int16 typLen;
415 : bool typByVal;
416 :
417 20 : prm = &(estate->es_param_exec_vals[paramid]);
418 20 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
419 : paramid);
420 :
421 : /* Write paramid. */
422 20 : memcpy(start_address, ¶mid, sizeof(int));
423 20 : start_address += sizeof(int);
424 :
425 : /* Write datum/isnull */
426 20 : if (OidIsValid(typeOid))
427 20 : get_typlenbyval(typeOid, &typLen, &typByVal);
428 : else
429 : {
430 : /* If no type OID, assume by-value, like copyParamList does. */
431 0 : typLen = sizeof(Datum);
432 0 : typByVal = true;
433 : }
434 20 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
435 : &start_address);
436 : }
437 :
438 16 : return handle;
439 : }
440 :
441 : /*
442 : * Restore specified PARAM_EXEC parameters.
443 : */
444 : static void
445 48 : RestoreParamExecParams(char *start_address, EState *estate)
446 : {
447 : int nparams;
448 : int i;
449 : int paramid;
450 :
451 48 : memcpy(&nparams, start_address, sizeof(int));
452 48 : start_address += sizeof(int);
453 :
454 104 : for (i = 0; i < nparams; i++)
455 : {
456 : ParamExecData *prm;
457 :
458 : /* Read paramid */
459 56 : memcpy(¶mid, start_address, sizeof(int));
460 56 : start_address += sizeof(int);
461 56 : prm = &(estate->es_param_exec_vals[paramid]);
462 :
463 : /* Read datum/isnull. */
464 56 : prm->value = datumRestore(&start_address, &prm->isnull);
465 56 : prm->execPlan = NULL;
466 : }
467 48 : }
468 :
469 : /*
470 : * Initialize the dynamic shared memory segment that will be used to control
471 : * parallel execution.
472 : */
473 : static bool
474 3303 : ExecParallelInitializeDSM(PlanState *planstate,
475 : ExecParallelInitializeDSMContext *d)
476 : {
477 3303 : if (planstate == NULL)
478 0 : return false;
479 :
480 : /* If instrumentation is enabled, initialize slot for this node. */
481 3303 : if (d->instrumentation != NULL)
482 684 : d->instrumentation->plan_node_id[d->nnodes] =
483 684 : planstate->plan->plan_node_id;
484 :
485 : /* Count this node. */
486 3303 : d->nnodes++;
487 :
488 : /*
489 : * Call initializers for DSM-using plan nodes.
490 : *
491 : * Most plan nodes won't do anything here, but plan nodes that allocated
492 : * DSM may need to initialize shared state in the DSM before parallel
493 : * workers are launched. They can allocate the space they previously
494 : * estimated using shm_toc_allocate, and add the keys they previously
495 : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
496 : */
497 3303 : switch (nodeTag(planstate))
498 : {
499 1528 : case T_SeqScanState:
500 1528 : if (planstate->plan->parallel_aware)
501 1190 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
502 : d->pcxt);
503 1528 : break;
504 276 : case T_IndexScanState:
505 276 : if (planstate->plan->parallel_aware)
506 12 : ExecIndexScanInitializeDSM((IndexScanState *) planstate,
507 : d->pcxt);
508 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
509 276 : ExecIndexScanInstrumentInitDSM((IndexScanState *) planstate,
510 : d->pcxt);
511 276 : break;
512 42 : case T_IndexOnlyScanState:
513 42 : if (planstate->plan->parallel_aware)
514 30 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
515 : d->pcxt);
516 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
517 42 : ExecIndexOnlyScanInstrumentInitDSM((IndexOnlyScanState *) planstate,
518 : d->pcxt);
519 42 : break;
520 13 : case T_BitmapIndexScanState:
521 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
522 13 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
523 13 : break;
524 0 : case T_ForeignScanState:
525 0 : if (planstate->plan->parallel_aware)
526 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
527 : d->pcxt);
528 0 : break;
529 16 : case T_TidRangeScanState:
530 16 : if (planstate->plan->parallel_aware)
531 16 : ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
532 : d->pcxt);
533 16 : break;
534 148 : case T_AppendState:
535 148 : if (planstate->plan->parallel_aware)
536 108 : ExecAppendInitializeDSM((AppendState *) planstate,
537 : d->pcxt);
538 148 : break;
539 0 : case T_CustomScanState:
540 0 : if (planstate->plan->parallel_aware)
541 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
542 : d->pcxt);
543 0 : break;
544 13 : case T_BitmapHeapScanState:
545 13 : if (planstate->plan->parallel_aware)
546 12 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
547 : d->pcxt);
548 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
549 13 : ExecBitmapHeapInstrumentInitDSM((BitmapHeapScanState *) planstate,
550 : d->pcxt);
551 13 : break;
552 208 : case T_HashJoinState:
553 208 : if (planstate->plan->parallel_aware)
554 84 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
555 : d->pcxt);
556 208 : break;
557 208 : case T_HashState:
558 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
559 208 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
560 208 : break;
561 201 : case T_SortState:
562 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
563 201 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
564 201 : break;
565 0 : case T_IncrementalSortState:
566 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
567 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
568 0 : break;
569 396 : case T_AggState:
570 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
571 396 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
572 396 : break;
573 4 : case T_MemoizeState:
574 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
575 4 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
576 4 : break;
577 250 : default:
578 250 : break;
579 : }
580 :
581 3303 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
582 : }
583 :
584 : /*
585 : * It sets up the response queues for backend workers to return tuples
586 : * to the main backend and start the workers.
587 : */
588 : static shm_mq_handle **
589 695 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
590 : {
591 : shm_mq_handle **responseq;
592 : char *tqueuespace;
593 : int i;
594 :
595 : /* Skip this if no workers. */
596 695 : if (pcxt->nworkers == 0)
597 0 : return NULL;
598 :
599 : /* Allocate memory for shared memory queue handles. */
600 : responseq = (shm_mq_handle **)
601 695 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
602 :
603 : /*
604 : * If not reinitializing, allocate space from the DSM for the queues;
605 : * otherwise, find the already allocated space.
606 : */
607 695 : if (!reinitialize)
608 : tqueuespace =
609 523 : shm_toc_allocate(pcxt->toc,
610 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
611 523 : pcxt->nworkers));
612 : else
613 172 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
614 :
615 : /* Create the queues, and become the receiver for each. */
616 2566 : for (i = 0; i < pcxt->nworkers; ++i)
617 : {
618 : shm_mq *mq;
619 :
620 1871 : mq = shm_mq_create(tqueuespace +
621 1871 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
622 : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
623 :
624 1871 : shm_mq_set_receiver(mq, MyProc);
625 1871 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
626 : }
627 :
628 : /* Add array of queues to shm_toc, so others can find it. */
629 695 : if (!reinitialize)
630 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
631 :
632 : /* Return array of handles. */
633 695 : return responseq;
634 : }
635 :
636 : /*
637 : * Sets up the required infrastructure for backend workers to perform
638 : * execution and return results to the main backend.
639 : */
640 : ParallelExecutorInfo *
641 523 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
642 : Bitmapset *sendParams, int nworkers,
643 : int64 tuples_needed)
644 : {
645 : ParallelExecutorInfo *pei;
646 : ParallelContext *pcxt;
647 : ExecParallelEstimateContext e;
648 : ExecParallelInitializeDSMContext d;
649 : FixedParallelExecutorState *fpes;
650 : char *pstmt_data;
651 : char *pstmt_space;
652 : char *paramlistinfo_space;
653 : BufferUsage *bufusage_space;
654 : WalUsage *walusage_space;
655 523 : SharedExecutorInstrumentation *instrumentation = NULL;
656 523 : SharedJitInstrumentation *jit_instrumentation = NULL;
657 : int pstmt_len;
658 : int paramlistinfo_len;
659 523 : int instrumentation_len = 0;
660 523 : int jit_instrumentation_len = 0;
661 523 : int instrument_offset = 0;
662 523 : Size dsa_minsize = dsa_minimum_size();
663 : char *query_string;
664 : int query_len;
665 :
666 : /*
667 : * Force any initplan outputs that we're going to pass to workers to be
668 : * evaluated, if they weren't already.
669 : *
670 : * For simplicity, we use the EState's per-output-tuple ExprContext here.
671 : * That risks intra-query memory leakage, since we might pass through here
672 : * many times before that ExprContext gets reset; but ExecSetParamPlan
673 : * doesn't normally leak any memory in the context (see its comments), so
674 : * it doesn't seem worth complicating this function's API to pass it a
675 : * shorter-lived ExprContext. This might need to change someday.
676 : */
677 523 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
678 :
679 : /* Allocate object for return value. */
680 523 : pei = palloc0_object(ParallelExecutorInfo);
681 523 : pei->finished = false;
682 523 : pei->planstate = planstate;
683 :
684 : /* Fix up and serialize plan to be sent to workers. */
685 523 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
686 :
687 : /* Create a parallel context. */
688 523 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
689 523 : pei->pcxt = pcxt;
690 :
691 : /*
692 : * Before telling the parallel context to create a dynamic shared memory
693 : * segment, we need to figure out how big it should be. Estimate space
694 : * for the various things we need to store.
695 : */
696 :
697 : /* Estimate space for fixed-size state. */
698 523 : shm_toc_estimate_chunk(&pcxt->estimator,
699 : sizeof(FixedParallelExecutorState));
700 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
701 :
702 : /* Estimate space for query text. */
703 523 : query_len = strlen(estate->es_sourceText);
704 523 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
705 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
706 :
707 : /* Estimate space for serialized PlannedStmt. */
708 523 : pstmt_len = strlen(pstmt_data) + 1;
709 523 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
710 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
711 :
712 : /* Estimate space for serialized ParamListInfo. */
713 523 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
714 523 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
715 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
716 :
717 : /*
718 : * Estimate space for BufferUsage.
719 : *
720 : * If EXPLAIN is not in use and there are no extensions loaded that care,
721 : * we could skip this. But we have no way of knowing whether anyone's
722 : * looking at pgBufferUsage, so do it unconditionally.
723 : */
724 523 : shm_toc_estimate_chunk(&pcxt->estimator,
725 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
726 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
727 :
728 : /*
729 : * Same thing for WalUsage.
730 : */
731 523 : shm_toc_estimate_chunk(&pcxt->estimator,
732 : mul_size(sizeof(WalUsage), pcxt->nworkers));
733 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
734 :
735 : /* Estimate space for tuple queues. */
736 523 : shm_toc_estimate_chunk(&pcxt->estimator,
737 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
738 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
739 :
740 : /*
741 : * Give parallel-aware nodes a chance to add to the estimates, and get a
742 : * count of how many PlanState nodes there are.
743 : */
744 523 : e.pcxt = pcxt;
745 523 : e.nnodes = 0;
746 523 : ExecParallelEstimate(planstate, &e);
747 :
748 : /* Estimate space for instrumentation, if required. */
749 523 : if (estate->es_instrument)
750 : {
751 120 : instrumentation_len =
752 : offsetof(SharedExecutorInstrumentation, plan_node_id) +
753 120 : sizeof(int) * e.nnodes;
754 120 : instrumentation_len = MAXALIGN(instrumentation_len);
755 120 : instrument_offset = instrumentation_len;
756 120 : instrumentation_len +=
757 120 : mul_size(sizeof(NodeInstrumentation),
758 120 : mul_size(e.nnodes, nworkers));
759 120 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
760 120 : shm_toc_estimate_keys(&pcxt->estimator, 1);
761 :
762 : /* Estimate space for JIT instrumentation, if required. */
763 120 : if (estate->es_jit_flags != PGJIT_NONE)
764 : {
765 0 : jit_instrumentation_len =
766 0 : offsetof(SharedJitInstrumentation, jit_instr) +
767 : sizeof(JitInstrumentation) * nworkers;
768 0 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
769 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
770 : }
771 : }
772 :
773 : /* Estimate space for DSA area. */
774 523 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
775 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
776 :
777 : /*
778 : * InitializeParallelDSM() passes the active snapshot to the parallel
779 : * worker, which uses it to set es_snapshot. Make sure we don't set
780 : * es_snapshot differently in the child.
781 : */
782 : Assert(GetActiveSnapshot() == estate->es_snapshot);
783 :
784 : /* Everyone's had a chance to ask for space, so now create the DSM. */
785 523 : InitializeParallelDSM(pcxt);
786 :
787 : /*
788 : * OK, now we have a dynamic shared memory segment, and it should be big
789 : * enough to store all of the data we estimated we would want to put into
790 : * it, plus whatever general stuff (not specifically executor-related) the
791 : * ParallelContext itself needs to store there. None of the space we
792 : * asked for has been allocated or initialized yet, though, so do that.
793 : */
794 :
795 : /* Store fixed-size state. */
796 523 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
797 523 : fpes->tuples_needed = tuples_needed;
798 523 : fpes->param_exec = InvalidDsaPointer;
799 523 : fpes->eflags = estate->es_top_eflags;
800 523 : fpes->jit_flags = estate->es_jit_flags;
801 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
802 :
803 : /* Store query string */
804 523 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
805 523 : memcpy(query_string, estate->es_sourceText, query_len + 1);
806 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
807 :
808 : /* Store serialized PlannedStmt. */
809 523 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
810 523 : memcpy(pstmt_space, pstmt_data, pstmt_len);
811 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
812 :
813 : /* Store serialized ParamListInfo. */
814 523 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
815 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
816 523 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
817 :
818 : /* Allocate space for each worker's BufferUsage; no need to initialize. */
819 523 : bufusage_space = shm_toc_allocate(pcxt->toc,
820 523 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
821 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
822 523 : pei->buffer_usage = bufusage_space;
823 :
824 : /* Same for WalUsage. */
825 523 : walusage_space = shm_toc_allocate(pcxt->toc,
826 523 : mul_size(sizeof(WalUsage), pcxt->nworkers));
827 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
828 523 : pei->wal_usage = walusage_space;
829 :
830 : /* Set up the tuple queues that the workers will write into. */
831 523 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
832 :
833 : /* We don't need the TupleQueueReaders yet, though. */
834 523 : pei->reader = NULL;
835 :
836 : /*
837 : * If instrumentation options were supplied, allocate space for the data.
838 : * It only gets partially initialized here; the rest happens during
839 : * ExecParallelInitializeDSM.
840 : */
841 523 : if (estate->es_instrument)
842 : {
843 : NodeInstrumentation *instrument;
844 : int i;
845 :
846 120 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
847 120 : instrumentation->instrument_options = estate->es_instrument;
848 120 : instrumentation->instrument_offset = instrument_offset;
849 120 : instrumentation->num_workers = nworkers;
850 120 : instrumentation->num_plan_nodes = e.nnodes;
851 120 : instrument = GetInstrumentationArray(instrumentation);
852 1240 : for (i = 0; i < nworkers * e.nnodes; ++i)
853 1120 : InstrInitNode(&instrument[i], estate->es_instrument, false);
854 120 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
855 : instrumentation);
856 120 : pei->instrumentation = instrumentation;
857 :
858 120 : if (estate->es_jit_flags != PGJIT_NONE)
859 : {
860 0 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
861 : jit_instrumentation_len);
862 0 : jit_instrumentation->num_workers = nworkers;
863 0 : memset(jit_instrumentation->jit_instr, 0,
864 : sizeof(JitInstrumentation) * nworkers);
865 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
866 : jit_instrumentation);
867 0 : pei->jit_instrumentation = jit_instrumentation;
868 : }
869 : }
870 :
871 : /*
872 : * Create a DSA area that can be used by the leader and all workers.
873 : * (However, if we failed to create a DSM and are using private memory
874 : * instead, then skip this.)
875 : */
876 523 : if (pcxt->seg != NULL)
877 : {
878 : char *area_space;
879 :
880 523 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
881 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
882 523 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
883 : LWTRANCHE_PARALLEL_QUERY_DSA,
884 : pcxt->seg);
885 :
886 : /*
887 : * Serialize parameters, if any, using DSA storage. We don't dare use
888 : * the main parallel query DSM for this because we might relaunch
889 : * workers after the values have changed (and thus the amount of
890 : * storage required has changed).
891 : */
892 523 : if (!bms_is_empty(sendParams))
893 : {
894 16 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
895 : pei->area);
896 16 : fpes->param_exec = pei->param_exec;
897 : }
898 : }
899 :
900 : /*
901 : * Give parallel-aware nodes a chance to initialize their shared data.
902 : * This also initializes the elements of instrumentation->ps_instrument,
903 : * if it exists.
904 : */
905 523 : d.pcxt = pcxt;
906 523 : d.instrumentation = instrumentation;
907 523 : d.nnodes = 0;
908 :
909 : /* Install our DSA area while initializing the plan. */
910 523 : estate->es_query_dsa = pei->area;
911 523 : ExecParallelInitializeDSM(planstate, &d);
912 523 : estate->es_query_dsa = NULL;
913 :
914 : /*
915 : * Make sure that the world hasn't shifted under our feet. This could
916 : * probably just be an Assert(), but let's be conservative for now.
917 : */
918 523 : if (e.nnodes != d.nnodes)
919 0 : elog(ERROR, "inconsistent count of PlanState nodes");
920 :
921 : /* OK, we're ready to rock and roll. */
922 523 : return pei;
923 : }
924 :
925 : /*
926 : * Set up tuple queue readers to read the results of a parallel subplan.
927 : *
928 : * This is separate from ExecInitParallelPlan() because we can launch the
929 : * worker processes and let them start doing something before we do this.
930 : */
931 : void
932 683 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
933 : {
934 683 : int nworkers = pei->pcxt->nworkers_launched;
935 : int i;
936 :
937 : Assert(pei->reader == NULL);
938 :
939 683 : if (nworkers > 0)
940 : {
941 683 : pei->reader = (TupleQueueReader **)
942 683 : palloc(nworkers * sizeof(TupleQueueReader *));
943 :
944 2497 : for (i = 0; i < nworkers; i++)
945 : {
946 1814 : shm_mq_set_handle(pei->tqueue[i],
947 1814 : pei->pcxt->worker[i].bgwhandle);
948 1814 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
949 : }
950 : }
951 683 : }
952 :
953 : /*
954 : * Re-initialize the parallel executor shared memory state before launching
955 : * a fresh batch of workers.
956 : */
957 : void
958 172 : ExecParallelReinitialize(PlanState *planstate,
959 : ParallelExecutorInfo *pei,
960 : Bitmapset *sendParams)
961 : {
962 172 : EState *estate = planstate->state;
963 : FixedParallelExecutorState *fpes;
964 :
965 : /* Old workers must already be shut down */
966 : Assert(pei->finished);
967 :
968 : /*
969 : * Force any initplan outputs that we're going to pass to workers to be
970 : * evaluated, if they weren't already (see comments in
971 : * ExecInitParallelPlan).
972 : */
973 172 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
974 :
975 172 : ReinitializeParallelDSM(pei->pcxt);
976 172 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
977 172 : pei->reader = NULL;
978 172 : pei->finished = false;
979 :
980 172 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
981 :
982 : /* Free any serialized parameters from the last round. */
983 172 : if (DsaPointerIsValid(fpes->param_exec))
984 : {
985 0 : dsa_free(pei->area, fpes->param_exec);
986 0 : fpes->param_exec = InvalidDsaPointer;
987 : }
988 :
989 : /* Serialize current parameter values if required. */
990 172 : if (!bms_is_empty(sendParams))
991 : {
992 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
993 : pei->area);
994 0 : fpes->param_exec = pei->param_exec;
995 : }
996 :
997 : /* Traverse plan tree and let each child node reset associated state. */
998 172 : estate->es_query_dsa = pei->area;
999 172 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
1000 172 : estate->es_query_dsa = NULL;
1001 172 : }
1002 :
1003 : /*
1004 : * Traverse plan tree to reinitialize per-node dynamic shared memory state
1005 : */
1006 : static bool
1007 444 : ExecParallelReInitializeDSM(PlanState *planstate,
1008 : ParallelContext *pcxt)
1009 : {
1010 444 : if (planstate == NULL)
1011 0 : return false;
1012 :
1013 : /*
1014 : * Call reinitializers for DSM-using plan nodes.
1015 : */
1016 444 : switch (nodeTag(planstate))
1017 : {
1018 184 : case T_SeqScanState:
1019 184 : if (planstate->plan->parallel_aware)
1020 152 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
1021 : pcxt);
1022 184 : break;
1023 8 : case T_IndexScanState:
1024 8 : if (planstate->plan->parallel_aware)
1025 8 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
1026 : pcxt);
1027 8 : break;
1028 8 : case T_IndexOnlyScanState:
1029 8 : if (planstate->plan->parallel_aware)
1030 8 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
1031 : pcxt);
1032 8 : break;
1033 0 : case T_ForeignScanState:
1034 0 : if (planstate->plan->parallel_aware)
1035 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
1036 : pcxt);
1037 0 : break;
1038 0 : case T_TidRangeScanState:
1039 0 : if (planstate->plan->parallel_aware)
1040 0 : ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1041 : pcxt);
1042 0 : break;
1043 0 : case T_AppendState:
1044 0 : if (planstate->plan->parallel_aware)
1045 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1046 0 : break;
1047 0 : case T_CustomScanState:
1048 0 : if (planstate->plan->parallel_aware)
1049 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1050 : pcxt);
1051 0 : break;
1052 36 : case T_BitmapHeapScanState:
1053 36 : if (planstate->plan->parallel_aware)
1054 36 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1055 : pcxt);
1056 36 : break;
1057 64 : case T_HashJoinState:
1058 64 : if (planstate->plan->parallel_aware)
1059 32 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1060 : pcxt);
1061 64 : break;
1062 120 : case T_BitmapIndexScanState:
1063 : case T_HashState:
1064 : case T_SortState:
1065 : case T_IncrementalSortState:
1066 : case T_MemoizeState:
1067 : /* these nodes have DSM state, but no reinitialization is required */
1068 120 : break;
1069 :
1070 24 : default:
1071 24 : break;
1072 : }
1073 :
1074 444 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1075 : }
1076 :
1077 : /*
1078 : * Copy instrumentation information about this node and its descendants from
1079 : * dynamic shared memory.
1080 : */
1081 : static bool
1082 684 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1083 : SharedExecutorInstrumentation *instrumentation)
1084 : {
1085 : NodeInstrumentation *instrument;
1086 : int i;
1087 : int n;
1088 : int ibytes;
1089 684 : int plan_node_id = planstate->plan->plan_node_id;
1090 : MemoryContext oldcontext;
1091 :
1092 : /* Find the instrumentation for this node. */
1093 3092 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1094 3092 : if (instrumentation->plan_node_id[i] == plan_node_id)
1095 684 : break;
1096 684 : if (i >= instrumentation->num_plan_nodes)
1097 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1098 :
1099 : /* Accumulate the statistics from all workers. */
1100 684 : instrument = GetInstrumentationArray(instrumentation);
1101 684 : instrument += i * instrumentation->num_workers;
1102 1804 : for (n = 0; n < instrumentation->num_workers; ++n)
1103 1120 : InstrAggNode(planstate->instrument, &instrument[n]);
1104 :
1105 : /*
1106 : * Also store the per-worker detail.
1107 : *
1108 : * Worker instrumentation should be allocated in the same context as the
1109 : * regular instrumentation information, which is the per-query context.
1110 : * Switch into per-query memory context.
1111 : */
1112 684 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1113 684 : ibytes = mul_size(instrumentation->num_workers, sizeof(NodeInstrumentation));
1114 684 : planstate->worker_instrument =
1115 684 : palloc(ibytes + offsetof(WorkerNodeInstrumentation, instrument));
1116 684 : MemoryContextSwitchTo(oldcontext);
1117 :
1118 684 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
1119 684 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1120 :
1121 : /* Perform any node-type-specific work that needs to be done. */
1122 684 : switch (nodeTag(planstate))
1123 : {
1124 180 : case T_IndexScanState:
1125 180 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1126 180 : break;
1127 0 : case T_IndexOnlyScanState:
1128 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1129 0 : break;
1130 0 : case T_BitmapIndexScanState:
1131 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1132 0 : break;
1133 8 : case T_SortState:
1134 8 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1135 8 : break;
1136 0 : case T_IncrementalSortState:
1137 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1138 0 : break;
1139 56 : case T_HashState:
1140 56 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1141 56 : break;
1142 68 : case T_AggState:
1143 68 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1144 68 : break;
1145 0 : case T_MemoizeState:
1146 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1147 0 : break;
1148 0 : case T_BitmapHeapScanState:
1149 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1150 0 : break;
1151 372 : default:
1152 372 : break;
1153 : }
1154 :
1155 684 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1156 : instrumentation);
1157 : }
1158 :
1159 : /*
1160 : * Add up the workers' JIT instrumentation from dynamic shared memory.
1161 : */
1162 : static void
1163 0 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1164 : SharedJitInstrumentation *shared_jit)
1165 : {
1166 : JitInstrumentation *combined;
1167 : int ibytes;
1168 :
1169 : int n;
1170 :
1171 : /*
1172 : * Accumulate worker JIT instrumentation into the combined JIT
1173 : * instrumentation, allocating it if required.
1174 : */
1175 0 : if (!planstate->state->es_jit_worker_instr)
1176 0 : planstate->state->es_jit_worker_instr =
1177 0 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1178 0 : combined = planstate->state->es_jit_worker_instr;
1179 :
1180 : /* Accumulate all the workers' instrumentations. */
1181 0 : for (n = 0; n < shared_jit->num_workers; ++n)
1182 0 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1183 :
1184 : /*
1185 : * Store the per-worker detail.
1186 : *
1187 : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1188 : * instrumentation in per-query context.
1189 : */
1190 0 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1191 0 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1192 0 : planstate->worker_jit_instrument =
1193 0 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1194 :
1195 0 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1196 0 : }
1197 :
1198 : /*
1199 : * Finish parallel execution. We wait for parallel workers to finish, and
1200 : * accumulate their buffer/WAL usage.
1201 : */
1202 : void
1203 1242 : ExecParallelFinish(ParallelExecutorInfo *pei)
1204 : {
1205 1242 : int nworkers = pei->pcxt->nworkers_launched;
1206 : int i;
1207 :
1208 : /* Make this be a no-op if called twice in a row. */
1209 1242 : if (pei->finished)
1210 555 : return;
1211 :
1212 : /*
1213 : * Detach from tuple queues ASAP, so that any still-active workers will
1214 : * notice that no further results are wanted.
1215 : */
1216 687 : if (pei->tqueue != NULL)
1217 : {
1218 2493 : for (i = 0; i < nworkers; i++)
1219 1806 : shm_mq_detach(pei->tqueue[i]);
1220 687 : pfree(pei->tqueue);
1221 687 : pei->tqueue = NULL;
1222 : }
1223 :
1224 : /*
1225 : * While we're waiting for the workers to finish, let's get rid of the
1226 : * tuple queue readers. (Any other local cleanup could be done here too.)
1227 : */
1228 687 : if (pei->reader != NULL)
1229 : {
1230 2481 : for (i = 0; i < nworkers; i++)
1231 1806 : DestroyTupleQueueReader(pei->reader[i]);
1232 675 : pfree(pei->reader);
1233 675 : pei->reader = NULL;
1234 : }
1235 :
1236 : /* Now wait for the workers to finish. */
1237 687 : WaitForParallelWorkersToFinish(pei->pcxt);
1238 :
1239 : /*
1240 : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1241 : * finish, or we might get incomplete data.)
1242 : */
1243 2493 : for (i = 0; i < nworkers; i++)
1244 1806 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1245 :
1246 687 : pei->finished = true;
1247 : }
1248 :
1249 : /*
1250 : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1251 : * resources still exist after ExecParallelFinish. We separate these
1252 : * routines because someone might want to examine the contents of the DSM
1253 : * after ExecParallelFinish and before calling this routine.
1254 : */
1255 : void
1256 515 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1257 : {
1258 : /* Accumulate instrumentation, if any. */
1259 515 : if (pei->instrumentation)
1260 120 : ExecParallelRetrieveInstrumentation(pei->planstate,
1261 : pei->instrumentation);
1262 :
1263 : /* Accumulate JIT instrumentation, if any. */
1264 515 : if (pei->jit_instrumentation)
1265 0 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1266 0 : pei->jit_instrumentation);
1267 :
1268 : /* Free any serialized parameters. */
1269 515 : if (DsaPointerIsValid(pei->param_exec))
1270 : {
1271 16 : dsa_free(pei->area, pei->param_exec);
1272 16 : pei->param_exec = InvalidDsaPointer;
1273 : }
1274 515 : if (pei->area != NULL)
1275 : {
1276 515 : dsa_detach(pei->area);
1277 515 : pei->area = NULL;
1278 : }
1279 515 : if (pei->pcxt != NULL)
1280 : {
1281 515 : DestroyParallelContext(pei->pcxt);
1282 515 : pei->pcxt = NULL;
1283 : }
1284 515 : pfree(pei);
1285 515 : }
1286 :
1287 : /*
1288 : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1289 : * for that purpose.
1290 : */
1291 : static DestReceiver *
1292 1814 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1293 : {
1294 : char *mqspace;
1295 : shm_mq *mq;
1296 :
1297 1814 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1298 1814 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1299 1814 : mq = (shm_mq *) mqspace;
1300 1814 : shm_mq_set_sender(mq, MyProc);
1301 1814 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1302 : }
1303 :
1304 : /*
1305 : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1306 : */
1307 : static QueryDesc *
1308 1814 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1309 : int instrument_options)
1310 : {
1311 : char *pstmtspace;
1312 : char *paramspace;
1313 : PlannedStmt *pstmt;
1314 : ParamListInfo paramLI;
1315 : char *queryString;
1316 :
1317 : /* Get the query string from shared memory */
1318 1814 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1319 :
1320 : /* Reconstruct leader-supplied PlannedStmt. */
1321 1814 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1322 1814 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1323 :
1324 : /* Reconstruct ParamListInfo. */
1325 1814 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1326 1814 : paramLI = RestoreParamList(¶mspace);
1327 :
1328 : /* Create a QueryDesc for the query. */
1329 1814 : return CreateQueryDesc(pstmt,
1330 : queryString,
1331 : GetActiveSnapshot(), InvalidSnapshot,
1332 : receiver, paramLI, NULL, instrument_options);
1333 : }
1334 :
1335 : /*
1336 : * Copy instrumentation information from this node and its descendants into
1337 : * dynamic shared memory, so that the parallel leader can retrieve it.
1338 : */
1339 : static bool
1340 1574 : ExecParallelReportInstrumentation(PlanState *planstate,
1341 : SharedExecutorInstrumentation *instrumentation)
1342 : {
1343 : int i;
1344 1574 : int plan_node_id = planstate->plan->plan_node_id;
1345 : NodeInstrumentation *instrument;
1346 :
1347 1574 : InstrEndLoop(planstate->instrument);
1348 :
1349 : /*
1350 : * If we shuffled the plan_node_id values in ps_instrument into sorted
1351 : * order, we could use binary search here. This might matter someday if
1352 : * we're pushing down sufficiently large plan trees. For now, do it the
1353 : * slow, dumb way.
1354 : */
1355 5178 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1356 5178 : if (instrumentation->plan_node_id[i] == plan_node_id)
1357 1574 : break;
1358 1574 : if (i >= instrumentation->num_plan_nodes)
1359 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1360 :
1361 : /*
1362 : * Add our statistics to the per-node, per-worker totals. It's possible
1363 : * that this could happen more than once if we relaunched workers.
1364 : */
1365 1574 : instrument = GetInstrumentationArray(instrumentation);
1366 1574 : instrument += i * instrumentation->num_workers;
1367 : Assert(IsParallelWorker());
1368 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1369 1574 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1370 :
1371 1574 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1372 : instrumentation);
1373 : }
1374 :
1375 : /*
1376 : * Initialize the PlanState and its descendants with the information
1377 : * retrieved from shared memory. This has to be done once the PlanState
1378 : * is allocated and initialized by executor; that is, after ExecutorStart().
1379 : */
1380 : static bool
1381 8163 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1382 : {
1383 8163 : if (planstate == NULL)
1384 0 : return false;
1385 :
1386 8163 : switch (nodeTag(planstate))
1387 : {
1388 3745 : case T_SeqScanState:
1389 3745 : if (planstate->plan->parallel_aware)
1390 2967 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1391 3745 : break;
1392 424 : case T_IndexScanState:
1393 424 : if (planstate->plan->parallel_aware)
1394 80 : ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1395 : pwcxt);
1396 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1397 424 : ExecIndexScanInstrumentInitWorker((IndexScanState *) planstate,
1398 : pwcxt);
1399 424 : break;
1400 168 : case T_IndexOnlyScanState:
1401 168 : if (planstate->plan->parallel_aware)
1402 136 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1403 : pwcxt);
1404 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1405 168 : ExecIndexOnlyScanInstrumentInitWorker((IndexOnlyScanState *) planstate,
1406 : pwcxt);
1407 168 : break;
1408 181 : case T_BitmapIndexScanState:
1409 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1410 181 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1411 : pwcxt);
1412 181 : break;
1413 0 : case T_ForeignScanState:
1414 0 : if (planstate->plan->parallel_aware)
1415 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1416 : pwcxt);
1417 0 : break;
1418 64 : case T_TidRangeScanState:
1419 64 : if (planstate->plan->parallel_aware)
1420 64 : ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1421 : pwcxt);
1422 64 : break;
1423 296 : case T_AppendState:
1424 296 : if (planstate->plan->parallel_aware)
1425 240 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1426 296 : break;
1427 0 : case T_CustomScanState:
1428 0 : if (planstate->plan->parallel_aware)
1429 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1430 : pwcxt);
1431 0 : break;
1432 181 : case T_BitmapHeapScanState:
1433 181 : if (planstate->plan->parallel_aware)
1434 180 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1435 : pwcxt);
1436 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1437 181 : ExecBitmapHeapInstrumentInitWorker((BitmapHeapScanState *) planstate,
1438 : pwcxt);
1439 181 : break;
1440 526 : case T_HashJoinState:
1441 526 : if (planstate->plan->parallel_aware)
1442 214 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1443 : pwcxt);
1444 526 : break;
1445 526 : case T_HashState:
1446 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1447 526 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1448 526 : break;
1449 499 : case T_SortState:
1450 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1451 499 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1452 499 : break;
1453 0 : case T_IncrementalSortState:
1454 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1455 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1456 : pwcxt);
1457 0 : break;
1458 1111 : case T_AggState:
1459 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1460 1111 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1461 1111 : break;
1462 8 : case T_MemoizeState:
1463 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1464 8 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1465 8 : break;
1466 434 : default:
1467 434 : break;
1468 : }
1469 :
1470 8163 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1471 : pwcxt);
1472 : }
1473 :
1474 : /*
1475 : * Main entrypoint for parallel query worker processes.
1476 : *
1477 : * We reach this function from ParallelWorkerMain, so the setup necessary to
1478 : * create a sensible parallel environment has already been done;
1479 : * ParallelWorkerMain worries about stuff like the transaction state, combo
1480 : * CID mappings, and GUC values, so we don't need to deal with any of that
1481 : * here.
1482 : *
1483 : * Our job is to deal with concerns specific to the executor. The parallel
1484 : * group leader will have stored a serialized PlannedStmt, and it's our job
1485 : * to execute that plan and write the resulting tuples to the appropriate
1486 : * tuple queue. Various bits of supporting information that we need in order
1487 : * to do this are also stored in the dsm_segment and can be accessed through
1488 : * the shm_toc.
1489 : */
1490 : void
1491 1814 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1492 : {
1493 : FixedParallelExecutorState *fpes;
1494 : BufferUsage *buffer_usage;
1495 : WalUsage *wal_usage;
1496 : DestReceiver *receiver;
1497 : QueryDesc *queryDesc;
1498 : SharedExecutorInstrumentation *instrumentation;
1499 : SharedJitInstrumentation *jit_instrumentation;
1500 1814 : int instrument_options = 0;
1501 : void *area_space;
1502 : dsa_area *area;
1503 : ParallelWorkerContext pwcxt;
1504 :
1505 : /* Get fixed-size state. */
1506 1814 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1507 :
1508 : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1509 1814 : receiver = ExecParallelGetReceiver(seg, toc);
1510 1814 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1511 1814 : if (instrumentation != NULL)
1512 482 : instrument_options = instrumentation->instrument_options;
1513 1814 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1514 : true);
1515 1814 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1516 :
1517 : /* Setting debug_query_string for individual workers */
1518 1814 : debug_query_string = queryDesc->sourceText;
1519 :
1520 : /* Report workers' query for monitoring purposes */
1521 1814 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1522 :
1523 : /* Attach to the dynamic shared memory area. */
1524 1814 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1525 1814 : area = dsa_attach_in_place(area_space, seg);
1526 :
1527 : /* Start up the executor */
1528 1814 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1529 1814 : ExecutorStart(queryDesc, fpes->eflags);
1530 :
1531 : /* Special executor initialization steps for parallel workers */
1532 1814 : queryDesc->planstate->state->es_query_dsa = area;
1533 1814 : if (DsaPointerIsValid(fpes->param_exec))
1534 : {
1535 : char *paramexec_space;
1536 :
1537 48 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1538 48 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1539 : }
1540 1814 : pwcxt.toc = toc;
1541 1814 : pwcxt.seg = seg;
1542 1814 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1543 :
1544 : /* Pass down any tuple bound */
1545 1814 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1546 :
1547 : /*
1548 : * Prepare to track buffer/WAL usage during query execution.
1549 : *
1550 : * We do this after starting up the executor to match what happens in the
1551 : * leader, which also doesn't count buffer accesses and WAL activity that
1552 : * occur during executor startup.
1553 : */
1554 1814 : InstrStartParallelQuery();
1555 :
1556 : /*
1557 : * Run the plan. If we specified a tuple bound, be careful not to demand
1558 : * more tuples than that.
1559 : */
1560 1814 : ExecutorRun(queryDesc,
1561 : ForwardScanDirection,
1562 1814 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1563 :
1564 : /* Shut down the executor */
1565 1806 : ExecutorFinish(queryDesc);
1566 :
1567 : /* Report buffer/WAL usage during parallel execution. */
1568 1806 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1569 1806 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1570 1806 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1571 1806 : &wal_usage[ParallelWorkerNumber]);
1572 :
1573 : /* Report instrumentation data if any instrumentation options are set. */
1574 1806 : if (instrumentation != NULL)
1575 482 : ExecParallelReportInstrumentation(queryDesc->planstate,
1576 : instrumentation);
1577 :
1578 : /* Report JIT instrumentation data if any */
1579 1806 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1580 : {
1581 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1582 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1583 0 : queryDesc->estate->es_jit->instr;
1584 : }
1585 :
1586 : /* Must do this after capturing instrumentation. */
1587 1806 : ExecutorEnd(queryDesc);
1588 :
1589 : /* Cleanup. */
1590 1806 : dsa_detach(area);
1591 1806 : FreeQueryDesc(queryDesc);
1592 1806 : receiver->rDestroy(receiver);
1593 1806 : }
|