Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execParallel.c
4 : * Support routines for parallel execution.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * This file contains routines that are intended to support setting up,
10 : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : * executor. The ParallelContext machinery will handle starting the
12 : * workers and ensuring that their state generally matches that of the
13 : * leader; see src/backend/access/transam/README.parallel for details.
14 : * However, we must save and restore relevant executor state, such as
15 : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : * the actual plan to be passed down to the worker.
17 : *
18 : * IDENTIFICATION
19 : * src/backend/executor/execParallel.c
20 : *
21 : *-------------------------------------------------------------------------
22 : */
23 :
24 : #include "postgres.h"
25 :
26 : #include "executor/execParallel.h"
27 : #include "executor/executor.h"
28 : #include "executor/nodeAgg.h"
29 : #include "executor/nodeAppend.h"
30 : #include "executor/nodeBitmapHeapscan.h"
31 : #include "executor/nodeBitmapIndexscan.h"
32 : #include "executor/nodeCustom.h"
33 : #include "executor/nodeForeignscan.h"
34 : #include "executor/nodeHash.h"
35 : #include "executor/nodeHashjoin.h"
36 : #include "executor/nodeIncrementalSort.h"
37 : #include "executor/nodeIndexonlyscan.h"
38 : #include "executor/nodeIndexscan.h"
39 : #include "executor/nodeMemoize.h"
40 : #include "executor/nodeSeqscan.h"
41 : #include "executor/nodeSort.h"
42 : #include "executor/nodeSubplan.h"
43 : #include "executor/nodeTidrangescan.h"
44 : #include "executor/tqueue.h"
45 : #include "jit/jit.h"
46 : #include "nodes/nodeFuncs.h"
47 : #include "pgstat.h"
48 : #include "storage/proc.h"
49 : #include "tcop/tcopprot.h"
50 : #include "utils/datum.h"
51 : #include "utils/dsa.h"
52 : #include "utils/lsyscache.h"
53 : #include "utils/snapmgr.h"
54 :
55 : /*
56 : * Magic numbers for parallel executor communication. We use constants
57 : * greater than any 32-bit integer here so that values < 2^32 can be used
58 : * by individual parallel nodes to store their own state.
59 : */
60 : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
61 : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
62 : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
63 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
64 : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
65 : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
66 : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
67 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
68 : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
69 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
70 :
71 : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
72 :
73 : /*
74 : * Fixed-size random stuff that we need to pass to parallel workers.
75 : */
76 : typedef struct FixedParallelExecutorState
77 : {
78 : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
79 : dsa_pointer param_exec;
80 : int eflags;
81 : int jit_flags;
82 : } FixedParallelExecutorState;
83 :
84 : /*
85 : * DSM structure for accumulating per-PlanState instrumentation.
86 : *
87 : * instrument_options: Same meaning here as in instrument.c.
88 : *
89 : * instrument_offset: Offset, relative to the start of this structure,
90 : * of the first NodeInstrumentation object. This will depend on the length of
91 : * the plan_node_id array.
92 : *
93 : * num_workers: Number of workers.
94 : *
95 : * num_plan_nodes: Number of plan nodes.
96 : *
97 : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
98 : * from parallel workers. The length of this array is given by num_plan_nodes.
99 : */
100 : struct SharedExecutorInstrumentation
101 : {
102 : int instrument_options;
103 : int instrument_offset;
104 : int num_workers;
105 : int num_plan_nodes;
106 : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
107 :
108 : /*
109 : * Array of num_plan_nodes * num_workers NodeInstrumentation objects
110 : * follows.
111 : */
112 : };
113 : #define GetInstrumentationArray(sei) \
114 : (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
115 : (NodeInstrumentation *) (((char *) sei) + sei->instrument_offset))
116 :
117 : /* Context object for ExecParallelEstimate. */
118 : typedef struct ExecParallelEstimateContext
119 : {
120 : ParallelContext *pcxt;
121 : int nnodes;
122 : } ExecParallelEstimateContext;
123 :
124 : /* Context object for ExecParallelInitializeDSM. */
125 : typedef struct ExecParallelInitializeDSMContext
126 : {
127 : ParallelContext *pcxt;
128 : SharedExecutorInstrumentation *instrumentation;
129 : int nnodes;
130 : } ExecParallelInitializeDSMContext;
131 :
132 : /* Helper functions that run in the parallel leader. */
133 : static char *ExecSerializePlan(Plan *plan, EState *estate);
134 : static bool ExecParallelEstimate(PlanState *planstate,
135 : ExecParallelEstimateContext *e);
136 : static bool ExecParallelInitializeDSM(PlanState *planstate,
137 : ExecParallelInitializeDSMContext *d);
138 : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
139 : bool reinitialize);
140 : static bool ExecParallelReInitializeDSM(PlanState *planstate,
141 : ParallelContext *pcxt);
142 : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
143 : SharedExecutorInstrumentation *instrumentation);
144 :
145 : /* Helper function that runs in the parallel worker. */
146 : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
147 :
148 : /*
149 : * Create a serialized representation of the plan to be sent to each worker.
150 : */
151 : static char *
152 523 : ExecSerializePlan(Plan *plan, EState *estate)
153 : {
154 : PlannedStmt *pstmt;
155 : ListCell *lc;
156 :
157 : /* We can't scribble on the original plan, so make a copy. */
158 523 : plan = copyObject(plan);
159 :
160 : /*
161 : * The worker will start its own copy of the executor, and that copy will
162 : * insert a junk filter if the toplevel node has any resjunk entries. We
163 : * don't want that to happen, because while resjunk columns shouldn't be
164 : * sent back to the user, here the tuples are coming back to another
165 : * backend which may very well need them. So mutate the target list
166 : * accordingly. This is sort of a hack; there might be better ways to do
167 : * this...
168 : */
169 1439 : foreach(lc, plan->targetlist)
170 : {
171 916 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
172 :
173 916 : tle->resjunk = false;
174 : }
175 :
176 : /*
177 : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
178 : * for our purposes, but the worker will need at least a minimal
179 : * PlannedStmt to start the executor.
180 : */
181 523 : pstmt = makeNode(PlannedStmt);
182 523 : pstmt->commandType = CMD_SELECT;
183 523 : pstmt->queryId = pgstat_get_my_query_id();
184 523 : pstmt->planId = pgstat_get_my_plan_id();
185 523 : pstmt->hasReturning = false;
186 523 : pstmt->hasModifyingCTE = false;
187 523 : pstmt->canSetTag = true;
188 523 : pstmt->transientPlan = false;
189 523 : pstmt->dependsOnRole = false;
190 523 : pstmt->parallelModeNeeded = false;
191 523 : pstmt->planTree = plan;
192 523 : pstmt->partPruneInfos = estate->es_part_prune_infos;
193 523 : pstmt->rtable = estate->es_range_table;
194 523 : pstmt->unprunableRelids = estate->es_unpruned_relids;
195 523 : pstmt->permInfos = estate->es_rteperminfos;
196 523 : pstmt->appendRelations = NIL;
197 523 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
198 :
199 : /*
200 : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
201 : * for unsafe ones (so that the list indexes of the safe ones are
202 : * preserved). This positively ensures that the worker won't try to run,
203 : * or even do ExecInitNode on, an unsafe subplan. That's important to
204 : * protect, eg, non-parallel-aware FDWs from getting into trouble.
205 : */
206 523 : pstmt->subplans = NIL;
207 559 : foreach(lc, estate->es_plannedstmt->subplans)
208 : {
209 36 : Plan *subplan = (Plan *) lfirst(lc);
210 :
211 36 : if (subplan && !subplan->parallel_safe)
212 8 : subplan = NULL;
213 36 : pstmt->subplans = lappend(pstmt->subplans, subplan);
214 : }
215 :
216 523 : pstmt->rewindPlanIDs = NULL;
217 523 : pstmt->rowMarks = NIL;
218 :
219 : /*
220 : * Pass the row mark and result relation relids to parallel workers. They
221 : * may need to check them to inform heuristics.
222 : */
223 523 : pstmt->rowMarkRelids = estate->es_plannedstmt->rowMarkRelids;
224 523 : pstmt->resultRelationRelids = estate->es_plannedstmt->resultRelationRelids;
225 523 : pstmt->relationOids = NIL;
226 523 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
227 523 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
228 523 : pstmt->utilityStmt = NULL;
229 523 : pstmt->stmt_location = -1;
230 523 : pstmt->stmt_len = -1;
231 :
232 : /* Return serialized copy of our dummy PlannedStmt. */
233 523 : return nodeToString(pstmt);
234 : }
235 :
236 : /*
237 : * Parallel-aware plan nodes (and occasionally others) may need some state
238 : * which is shared across all parallel workers. Before we size the DSM, give
239 : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
240 : * &pcxt->estimator.
241 : *
242 : * While we're at it, count the number of PlanState nodes in the tree, so
243 : * we know how many Instrumentation structures we need.
244 : */
245 : static bool
246 3303 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
247 : {
248 3303 : if (planstate == NULL)
249 0 : return false;
250 :
251 : /* Count this node. */
252 3303 : e->nnodes++;
253 :
254 3303 : switch (nodeTag(planstate))
255 : {
256 1528 : case T_SeqScanState:
257 1528 : if (planstate->plan->parallel_aware)
258 1190 : ExecSeqScanEstimate((SeqScanState *) planstate,
259 : e->pcxt);
260 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
261 1528 : ExecSeqScanInstrumentEstimate((SeqScanState *) planstate,
262 : e->pcxt);
263 1528 : break;
264 276 : case T_IndexScanState:
265 276 : if (planstate->plan->parallel_aware)
266 12 : ExecIndexScanEstimate((IndexScanState *) planstate,
267 : e->pcxt);
268 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
269 276 : ExecIndexScanInstrumentEstimate((IndexScanState *) planstate,
270 : e->pcxt);
271 276 : break;
272 42 : case T_IndexOnlyScanState:
273 42 : if (planstate->plan->parallel_aware)
274 30 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
275 : e->pcxt);
276 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
277 42 : ExecIndexOnlyScanInstrumentEstimate((IndexOnlyScanState *) planstate,
278 : e->pcxt);
279 42 : break;
280 13 : case T_BitmapIndexScanState:
281 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
282 13 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
283 : e->pcxt);
284 13 : break;
285 0 : case T_ForeignScanState:
286 0 : if (planstate->plan->parallel_aware)
287 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
288 : e->pcxt);
289 0 : break;
290 16 : case T_TidRangeScanState:
291 16 : if (planstate->plan->parallel_aware)
292 16 : ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
293 : e->pcxt);
294 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
295 16 : ExecTidRangeScanInstrumentEstimate((TidRangeScanState *) planstate,
296 : e->pcxt);
297 16 : break;
298 148 : case T_AppendState:
299 148 : if (planstate->plan->parallel_aware)
300 108 : ExecAppendEstimate((AppendState *) planstate,
301 : e->pcxt);
302 148 : break;
303 0 : case T_CustomScanState:
304 0 : if (planstate->plan->parallel_aware)
305 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
306 : e->pcxt);
307 0 : break;
308 13 : case T_BitmapHeapScanState:
309 13 : if (planstate->plan->parallel_aware)
310 12 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
311 : e->pcxt);
312 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
313 13 : ExecBitmapHeapInstrumentEstimate((BitmapHeapScanState *) planstate,
314 : e->pcxt);
315 13 : break;
316 208 : case T_HashJoinState:
317 208 : if (planstate->plan->parallel_aware)
318 84 : ExecHashJoinEstimate((HashJoinState *) planstate,
319 : e->pcxt);
320 208 : break;
321 208 : case T_HashState:
322 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
323 208 : ExecHashEstimate((HashState *) planstate, e->pcxt);
324 208 : break;
325 201 : case T_SortState:
326 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
327 201 : ExecSortEstimate((SortState *) planstate, e->pcxt);
328 201 : break;
329 0 : case T_IncrementalSortState:
330 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
331 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
332 0 : break;
333 396 : case T_AggState:
334 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
335 396 : ExecAggEstimate((AggState *) planstate, e->pcxt);
336 396 : break;
337 4 : case T_MemoizeState:
338 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
339 4 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
340 4 : break;
341 250 : default:
342 250 : break;
343 : }
344 :
345 3303 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
346 : }
347 :
348 : /*
349 : * Estimate the amount of space required to serialize the indicated parameters.
350 : */
351 : static Size
352 16 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
353 : {
354 : int paramid;
355 16 : Size sz = sizeof(int);
356 :
357 16 : paramid = -1;
358 36 : while ((paramid = bms_next_member(params, paramid)) >= 0)
359 : {
360 : Oid typeOid;
361 : int16 typLen;
362 : bool typByVal;
363 : ParamExecData *prm;
364 :
365 20 : prm = &(estate->es_param_exec_vals[paramid]);
366 20 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
367 : paramid);
368 :
369 20 : sz = add_size(sz, sizeof(int)); /* space for paramid */
370 :
371 : /* space for datum/isnull */
372 20 : if (OidIsValid(typeOid))
373 20 : get_typlenbyval(typeOid, &typLen, &typByVal);
374 : else
375 : {
376 : /* If no type OID, assume by-value, like copyParamList does. */
377 0 : typLen = sizeof(Datum);
378 0 : typByVal = true;
379 : }
380 20 : sz = add_size(sz,
381 20 : datumEstimateSpace(prm->value, prm->isnull,
382 : typByVal, typLen));
383 : }
384 16 : return sz;
385 : }
386 :
387 : /*
388 : * Serialize specified PARAM_EXEC parameters.
389 : *
390 : * We write the number of parameters first, as a 4-byte integer, and then
391 : * write details for each parameter in turn. The details for each parameter
392 : * consist of a 4-byte paramid (location of param in execution time internal
393 : * parameter array) and then the datum as serialized by datumSerialize().
394 : */
395 : static dsa_pointer
396 16 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
397 : {
398 : Size size;
399 : int nparams;
400 : int paramid;
401 : ParamExecData *prm;
402 : dsa_pointer handle;
403 : char *start_address;
404 :
405 : /* Allocate enough space for the current parameter values. */
406 16 : size = EstimateParamExecSpace(estate, params);
407 16 : handle = dsa_allocate(area, size);
408 16 : start_address = dsa_get_address(area, handle);
409 :
410 : /* First write the number of parameters as a 4-byte integer. */
411 16 : nparams = bms_num_members(params);
412 16 : memcpy(start_address, &nparams, sizeof(int));
413 16 : start_address += sizeof(int);
414 :
415 : /* Write details for each parameter in turn. */
416 16 : paramid = -1;
417 36 : while ((paramid = bms_next_member(params, paramid)) >= 0)
418 : {
419 : Oid typeOid;
420 : int16 typLen;
421 : bool typByVal;
422 :
423 20 : prm = &(estate->es_param_exec_vals[paramid]);
424 20 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
425 : paramid);
426 :
427 : /* Write paramid. */
428 20 : memcpy(start_address, ¶mid, sizeof(int));
429 20 : start_address += sizeof(int);
430 :
431 : /* Write datum/isnull */
432 20 : if (OidIsValid(typeOid))
433 20 : get_typlenbyval(typeOid, &typLen, &typByVal);
434 : else
435 : {
436 : /* If no type OID, assume by-value, like copyParamList does. */
437 0 : typLen = sizeof(Datum);
438 0 : typByVal = true;
439 : }
440 20 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
441 : &start_address);
442 : }
443 :
444 16 : return handle;
445 : }
446 :
447 : /*
448 : * Restore specified PARAM_EXEC parameters.
449 : */
450 : static void
451 48 : RestoreParamExecParams(char *start_address, EState *estate)
452 : {
453 : int nparams;
454 : int i;
455 : int paramid;
456 :
457 48 : memcpy(&nparams, start_address, sizeof(int));
458 48 : start_address += sizeof(int);
459 :
460 104 : for (i = 0; i < nparams; i++)
461 : {
462 : ParamExecData *prm;
463 :
464 : /* Read paramid */
465 56 : memcpy(¶mid, start_address, sizeof(int));
466 56 : start_address += sizeof(int);
467 56 : prm = &(estate->es_param_exec_vals[paramid]);
468 :
469 : /* Read datum/isnull. */
470 56 : prm->value = datumRestore(&start_address, &prm->isnull);
471 56 : prm->execPlan = NULL;
472 : }
473 48 : }
474 :
475 : /*
476 : * Initialize the dynamic shared memory segment that will be used to control
477 : * parallel execution.
478 : */
479 : static bool
480 3303 : ExecParallelInitializeDSM(PlanState *planstate,
481 : ExecParallelInitializeDSMContext *d)
482 : {
483 3303 : if (planstate == NULL)
484 0 : return false;
485 :
486 : /* If instrumentation is enabled, initialize slot for this node. */
487 3303 : if (d->instrumentation != NULL)
488 684 : d->instrumentation->plan_node_id[d->nnodes] =
489 684 : planstate->plan->plan_node_id;
490 :
491 : /* Count this node. */
492 3303 : d->nnodes++;
493 :
494 : /*
495 : * Call initializers for DSM-using plan nodes.
496 : *
497 : * Most plan nodes won't do anything here, but plan nodes that allocated
498 : * DSM may need to initialize shared state in the DSM before parallel
499 : * workers are launched. They can allocate the space they previously
500 : * estimated using shm_toc_allocate, and add the keys they previously
501 : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
502 : */
503 3303 : switch (nodeTag(planstate))
504 : {
505 1528 : case T_SeqScanState:
506 1528 : if (planstate->plan->parallel_aware)
507 1190 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
508 : d->pcxt);
509 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
510 1528 : ExecSeqScanInstrumentInitDSM((SeqScanState *) planstate,
511 : d->pcxt);
512 1528 : break;
513 276 : case T_IndexScanState:
514 276 : if (planstate->plan->parallel_aware)
515 12 : ExecIndexScanInitializeDSM((IndexScanState *) planstate,
516 : d->pcxt);
517 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
518 276 : ExecIndexScanInstrumentInitDSM((IndexScanState *) planstate,
519 : d->pcxt);
520 276 : break;
521 42 : case T_IndexOnlyScanState:
522 42 : if (planstate->plan->parallel_aware)
523 30 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
524 : d->pcxt);
525 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
526 42 : ExecIndexOnlyScanInstrumentInitDSM((IndexOnlyScanState *) planstate,
527 : d->pcxt);
528 42 : break;
529 13 : case T_BitmapIndexScanState:
530 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
531 13 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
532 13 : break;
533 0 : case T_ForeignScanState:
534 0 : if (planstate->plan->parallel_aware)
535 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
536 : d->pcxt);
537 0 : break;
538 16 : case T_TidRangeScanState:
539 16 : if (planstate->plan->parallel_aware)
540 16 : ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
541 : d->pcxt);
542 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
543 16 : ExecTidRangeScanInstrumentInitDSM((TidRangeScanState *) planstate,
544 : d->pcxt);
545 16 : break;
546 148 : case T_AppendState:
547 148 : if (planstate->plan->parallel_aware)
548 108 : ExecAppendInitializeDSM((AppendState *) planstate,
549 : d->pcxt);
550 148 : break;
551 0 : case T_CustomScanState:
552 0 : if (planstate->plan->parallel_aware)
553 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
554 : d->pcxt);
555 0 : break;
556 13 : case T_BitmapHeapScanState:
557 13 : if (planstate->plan->parallel_aware)
558 12 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
559 : d->pcxt);
560 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
561 13 : ExecBitmapHeapInstrumentInitDSM((BitmapHeapScanState *) planstate,
562 : d->pcxt);
563 13 : break;
564 208 : case T_HashJoinState:
565 208 : if (planstate->plan->parallel_aware)
566 84 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
567 : d->pcxt);
568 208 : break;
569 208 : case T_HashState:
570 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
571 208 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
572 208 : break;
573 201 : case T_SortState:
574 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
575 201 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
576 201 : break;
577 0 : case T_IncrementalSortState:
578 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
579 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
580 0 : break;
581 396 : case T_AggState:
582 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
583 396 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
584 396 : break;
585 4 : case T_MemoizeState:
586 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
587 4 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
588 4 : break;
589 250 : default:
590 250 : break;
591 : }
592 :
593 3303 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
594 : }
595 :
596 : /*
597 : * It sets up the response queues for backend workers to return tuples
598 : * to the main backend and start the workers.
599 : */
600 : static shm_mq_handle **
601 695 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
602 : {
603 : shm_mq_handle **responseq;
604 : char *tqueuespace;
605 : int i;
606 :
607 : /* Skip this if no workers. */
608 695 : if (pcxt->nworkers == 0)
609 0 : return NULL;
610 :
611 : /* Allocate memory for shared memory queue handles. */
612 : responseq = (shm_mq_handle **)
613 695 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
614 :
615 : /*
616 : * If not reinitializing, allocate space from the DSM for the queues;
617 : * otherwise, find the already allocated space.
618 : */
619 695 : if (!reinitialize)
620 : tqueuespace =
621 523 : shm_toc_allocate(pcxt->toc,
622 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
623 523 : pcxt->nworkers));
624 : else
625 172 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
626 :
627 : /* Create the queues, and become the receiver for each. */
628 2564 : for (i = 0; i < pcxt->nworkers; ++i)
629 : {
630 : shm_mq *mq;
631 :
632 1869 : mq = shm_mq_create(tqueuespace +
633 1869 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
634 : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
635 :
636 1869 : shm_mq_set_receiver(mq, MyProc);
637 1869 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
638 : }
639 :
640 : /* Add array of queues to shm_toc, so others can find it. */
641 695 : if (!reinitialize)
642 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
643 :
644 : /* Return array of handles. */
645 695 : return responseq;
646 : }
647 :
648 : /*
649 : * Sets up the required infrastructure for backend workers to perform
650 : * execution and return results to the main backend.
651 : */
652 : ParallelExecutorInfo *
653 523 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
654 : Bitmapset *sendParams, int nworkers,
655 : int64 tuples_needed)
656 : {
657 : ParallelExecutorInfo *pei;
658 : ParallelContext *pcxt;
659 : ExecParallelEstimateContext e;
660 : ExecParallelInitializeDSMContext d;
661 : FixedParallelExecutorState *fpes;
662 : char *pstmt_data;
663 : char *pstmt_space;
664 : char *paramlistinfo_space;
665 : BufferUsage *bufusage_space;
666 : WalUsage *walusage_space;
667 523 : SharedExecutorInstrumentation *instrumentation = NULL;
668 523 : SharedJitInstrumentation *jit_instrumentation = NULL;
669 : int pstmt_len;
670 : int paramlistinfo_len;
671 523 : int instrumentation_len = 0;
672 523 : int jit_instrumentation_len = 0;
673 523 : int instrument_offset = 0;
674 523 : Size dsa_minsize = dsa_minimum_size();
675 : char *query_string;
676 : int query_len;
677 :
678 : /*
679 : * Force any initplan outputs that we're going to pass to workers to be
680 : * evaluated, if they weren't already.
681 : *
682 : * For simplicity, we use the EState's per-output-tuple ExprContext here.
683 : * That risks intra-query memory leakage, since we might pass through here
684 : * many times before that ExprContext gets reset; but ExecSetParamPlan
685 : * doesn't normally leak any memory in the context (see its comments), so
686 : * it doesn't seem worth complicating this function's API to pass it a
687 : * shorter-lived ExprContext. This might need to change someday.
688 : */
689 523 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
690 :
691 : /* Allocate object for return value. */
692 523 : pei = palloc0_object(ParallelExecutorInfo);
693 523 : pei->finished = false;
694 523 : pei->planstate = planstate;
695 :
696 : /* Fix up and serialize plan to be sent to workers. */
697 523 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
698 :
699 : /* Create a parallel context. */
700 523 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
701 523 : pei->pcxt = pcxt;
702 :
703 : /*
704 : * Before telling the parallel context to create a dynamic shared memory
705 : * segment, we need to figure out how big it should be. Estimate space
706 : * for the various things we need to store.
707 : */
708 :
709 : /* Estimate space for fixed-size state. */
710 523 : shm_toc_estimate_chunk(&pcxt->estimator,
711 : sizeof(FixedParallelExecutorState));
712 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
713 :
714 : /* Estimate space for query text. */
715 523 : query_len = strlen(estate->es_sourceText);
716 523 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
717 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
718 :
719 : /* Estimate space for serialized PlannedStmt. */
720 523 : pstmt_len = strlen(pstmt_data) + 1;
721 523 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
722 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
723 :
724 : /* Estimate space for serialized ParamListInfo. */
725 523 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
726 523 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
727 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
728 :
729 : /*
730 : * Estimate space for BufferUsage.
731 : *
732 : * If EXPLAIN is not in use and there are no extensions loaded that care,
733 : * we could skip this. But we have no way of knowing whether anyone's
734 : * looking at pgBufferUsage, so do it unconditionally.
735 : */
736 523 : shm_toc_estimate_chunk(&pcxt->estimator,
737 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
738 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
739 :
740 : /*
741 : * Same thing for WalUsage.
742 : */
743 523 : shm_toc_estimate_chunk(&pcxt->estimator,
744 : mul_size(sizeof(WalUsage), pcxt->nworkers));
745 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
746 :
747 : /* Estimate space for tuple queues. */
748 523 : shm_toc_estimate_chunk(&pcxt->estimator,
749 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
750 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
751 :
752 : /*
753 : * Give parallel-aware nodes a chance to add to the estimates, and get a
754 : * count of how many PlanState nodes there are.
755 : */
756 523 : e.pcxt = pcxt;
757 523 : e.nnodes = 0;
758 523 : ExecParallelEstimate(planstate, &e);
759 :
760 : /* Estimate space for instrumentation, if required. */
761 523 : if (estate->es_instrument)
762 : {
763 120 : instrumentation_len =
764 : offsetof(SharedExecutorInstrumentation, plan_node_id) +
765 120 : sizeof(int) * e.nnodes;
766 120 : instrumentation_len = MAXALIGN(instrumentation_len);
767 120 : instrument_offset = instrumentation_len;
768 120 : instrumentation_len +=
769 120 : mul_size(sizeof(NodeInstrumentation),
770 120 : mul_size(e.nnodes, nworkers));
771 120 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
772 120 : shm_toc_estimate_keys(&pcxt->estimator, 1);
773 :
774 : /* Estimate space for JIT instrumentation, if required. */
775 120 : if (estate->es_jit_flags != PGJIT_NONE)
776 : {
777 0 : jit_instrumentation_len =
778 0 : offsetof(SharedJitInstrumentation, jit_instr) +
779 : sizeof(JitInstrumentation) * nworkers;
780 0 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
781 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
782 : }
783 : }
784 :
785 : /* Estimate space for DSA area. */
786 523 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
787 523 : shm_toc_estimate_keys(&pcxt->estimator, 1);
788 :
789 : /*
790 : * InitializeParallelDSM() passes the active snapshot to the parallel
791 : * worker, which uses it to set es_snapshot. Make sure we don't set
792 : * es_snapshot differently in the child.
793 : */
794 : Assert(GetActiveSnapshot() == estate->es_snapshot);
795 :
796 : /* Everyone's had a chance to ask for space, so now create the DSM. */
797 523 : InitializeParallelDSM(pcxt);
798 :
799 : /*
800 : * OK, now we have a dynamic shared memory segment, and it should be big
801 : * enough to store all of the data we estimated we would want to put into
802 : * it, plus whatever general stuff (not specifically executor-related) the
803 : * ParallelContext itself needs to store there. None of the space we
804 : * asked for has been allocated or initialized yet, though, so do that.
805 : */
806 :
807 : /* Store fixed-size state. */
808 523 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
809 523 : fpes->tuples_needed = tuples_needed;
810 523 : fpes->param_exec = InvalidDsaPointer;
811 523 : fpes->eflags = estate->es_top_eflags;
812 523 : fpes->jit_flags = estate->es_jit_flags;
813 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
814 :
815 : /* Store query string */
816 523 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
817 523 : memcpy(query_string, estate->es_sourceText, query_len + 1);
818 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
819 :
820 : /* Store serialized PlannedStmt. */
821 523 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
822 523 : memcpy(pstmt_space, pstmt_data, pstmt_len);
823 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
824 :
825 : /* Store serialized ParamListInfo. */
826 523 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
827 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
828 523 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
829 :
830 : /* Allocate space for each worker's BufferUsage; no need to initialize. */
831 523 : bufusage_space = shm_toc_allocate(pcxt->toc,
832 523 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
833 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
834 523 : pei->buffer_usage = bufusage_space;
835 :
836 : /* Same for WalUsage. */
837 523 : walusage_space = shm_toc_allocate(pcxt->toc,
838 523 : mul_size(sizeof(WalUsage), pcxt->nworkers));
839 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
840 523 : pei->wal_usage = walusage_space;
841 :
842 : /* Set up the tuple queues that the workers will write into. */
843 523 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
844 :
845 : /* We don't need the TupleQueueReaders yet, though. */
846 523 : pei->reader = NULL;
847 :
848 : /*
849 : * If instrumentation options were supplied, allocate space for the data.
850 : * It only gets partially initialized here; the rest happens during
851 : * ExecParallelInitializeDSM.
852 : */
853 523 : if (estate->es_instrument)
854 : {
855 : NodeInstrumentation *instrument;
856 : int i;
857 :
858 120 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
859 120 : instrumentation->instrument_options = estate->es_instrument;
860 120 : instrumentation->instrument_offset = instrument_offset;
861 120 : instrumentation->num_workers = nworkers;
862 120 : instrumentation->num_plan_nodes = e.nnodes;
863 120 : instrument = GetInstrumentationArray(instrumentation);
864 1240 : for (i = 0; i < nworkers * e.nnodes; ++i)
865 1120 : InstrInitNode(&instrument[i], estate->es_instrument, false);
866 120 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
867 : instrumentation);
868 120 : pei->instrumentation = instrumentation;
869 :
870 120 : if (estate->es_jit_flags != PGJIT_NONE)
871 : {
872 0 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
873 : jit_instrumentation_len);
874 0 : jit_instrumentation->num_workers = nworkers;
875 0 : memset(jit_instrumentation->jit_instr, 0,
876 : sizeof(JitInstrumentation) * nworkers);
877 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
878 : jit_instrumentation);
879 0 : pei->jit_instrumentation = jit_instrumentation;
880 : }
881 : }
882 :
883 : /*
884 : * Create a DSA area that can be used by the leader and all workers.
885 : * (However, if we failed to create a DSM and are using private memory
886 : * instead, then skip this.)
887 : */
888 523 : if (pcxt->seg != NULL)
889 : {
890 : char *area_space;
891 :
892 523 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
893 523 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
894 523 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
895 : LWTRANCHE_PARALLEL_QUERY_DSA,
896 : pcxt->seg);
897 :
898 : /*
899 : * Serialize parameters, if any, using DSA storage. We don't dare use
900 : * the main parallel query DSM for this because we might relaunch
901 : * workers after the values have changed (and thus the amount of
902 : * storage required has changed).
903 : */
904 523 : if (!bms_is_empty(sendParams))
905 : {
906 16 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
907 : pei->area);
908 16 : fpes->param_exec = pei->param_exec;
909 : }
910 : }
911 :
912 : /*
913 : * Give parallel-aware nodes a chance to initialize their shared data.
914 : * This also initializes the elements of instrumentation->ps_instrument,
915 : * if it exists.
916 : */
917 523 : d.pcxt = pcxt;
918 523 : d.instrumentation = instrumentation;
919 523 : d.nnodes = 0;
920 :
921 : /* Install our DSA area while initializing the plan. */
922 523 : estate->es_query_dsa = pei->area;
923 523 : ExecParallelInitializeDSM(planstate, &d);
924 523 : estate->es_query_dsa = NULL;
925 :
926 : /*
927 : * Make sure that the world hasn't shifted under our feet. This could
928 : * probably just be an Assert(), but let's be conservative for now.
929 : */
930 523 : if (e.nnodes != d.nnodes)
931 0 : elog(ERROR, "inconsistent count of PlanState nodes");
932 :
933 : /* OK, we're ready to rock and roll. */
934 523 : return pei;
935 : }
936 :
937 : /*
938 : * Set up tuple queue readers to read the results of a parallel subplan.
939 : *
940 : * This is separate from ExecInitParallelPlan() because we can launch the
941 : * worker processes and let them start doing something before we do this.
942 : */
943 : void
944 683 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
945 : {
946 683 : int nworkers = pei->pcxt->nworkers_launched;
947 : int i;
948 :
949 : Assert(pei->reader == NULL);
950 :
951 683 : if (nworkers > 0)
952 : {
953 683 : pei->reader = (TupleQueueReader **)
954 683 : palloc(nworkers * sizeof(TupleQueueReader *));
955 :
956 2496 : for (i = 0; i < nworkers; i++)
957 : {
958 1813 : shm_mq_set_handle(pei->tqueue[i],
959 1813 : pei->pcxt->worker[i].bgwhandle);
960 1813 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
961 : }
962 : }
963 683 : }
964 :
965 : /*
966 : * Re-initialize the parallel executor shared memory state before launching
967 : * a fresh batch of workers.
968 : */
969 : void
970 172 : ExecParallelReinitialize(PlanState *planstate,
971 : ParallelExecutorInfo *pei,
972 : Bitmapset *sendParams)
973 : {
974 172 : EState *estate = planstate->state;
975 : FixedParallelExecutorState *fpes;
976 :
977 : /* Old workers must already be shut down */
978 : Assert(pei->finished);
979 :
980 : /*
981 : * Force any initplan outputs that we're going to pass to workers to be
982 : * evaluated, if they weren't already (see comments in
983 : * ExecInitParallelPlan).
984 : */
985 172 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
986 :
987 172 : ReinitializeParallelDSM(pei->pcxt);
988 172 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
989 172 : pei->reader = NULL;
990 172 : pei->finished = false;
991 :
992 172 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
993 :
994 : /* Free any serialized parameters from the last round. */
995 172 : if (DsaPointerIsValid(fpes->param_exec))
996 : {
997 0 : dsa_free(pei->area, fpes->param_exec);
998 0 : fpes->param_exec = InvalidDsaPointer;
999 : }
1000 :
1001 : /* Serialize current parameter values if required. */
1002 172 : if (!bms_is_empty(sendParams))
1003 : {
1004 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
1005 : pei->area);
1006 0 : fpes->param_exec = pei->param_exec;
1007 : }
1008 :
1009 : /* Traverse plan tree and let each child node reset associated state. */
1010 172 : estate->es_query_dsa = pei->area;
1011 172 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
1012 172 : estate->es_query_dsa = NULL;
1013 172 : }
1014 :
1015 : /*
1016 : * Traverse plan tree to reinitialize per-node dynamic shared memory state
1017 : */
1018 : static bool
1019 444 : ExecParallelReInitializeDSM(PlanState *planstate,
1020 : ParallelContext *pcxt)
1021 : {
1022 444 : if (planstate == NULL)
1023 0 : return false;
1024 :
1025 : /*
1026 : * Call reinitializers for DSM-using plan nodes.
1027 : */
1028 444 : switch (nodeTag(planstate))
1029 : {
1030 184 : case T_SeqScanState:
1031 184 : if (planstate->plan->parallel_aware)
1032 152 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
1033 : pcxt);
1034 184 : break;
1035 8 : case T_IndexScanState:
1036 8 : if (planstate->plan->parallel_aware)
1037 8 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
1038 : pcxt);
1039 8 : break;
1040 8 : case T_IndexOnlyScanState:
1041 8 : if (planstate->plan->parallel_aware)
1042 8 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
1043 : pcxt);
1044 8 : break;
1045 0 : case T_ForeignScanState:
1046 0 : if (planstate->plan->parallel_aware)
1047 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
1048 : pcxt);
1049 0 : break;
1050 0 : case T_TidRangeScanState:
1051 0 : if (planstate->plan->parallel_aware)
1052 0 : ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1053 : pcxt);
1054 0 : break;
1055 0 : case T_AppendState:
1056 0 : if (planstate->plan->parallel_aware)
1057 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1058 0 : break;
1059 0 : case T_CustomScanState:
1060 0 : if (planstate->plan->parallel_aware)
1061 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1062 : pcxt);
1063 0 : break;
1064 36 : case T_BitmapHeapScanState:
1065 36 : if (planstate->plan->parallel_aware)
1066 36 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1067 : pcxt);
1068 36 : break;
1069 64 : case T_HashJoinState:
1070 64 : if (planstate->plan->parallel_aware)
1071 32 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1072 : pcxt);
1073 64 : break;
1074 120 : case T_BitmapIndexScanState:
1075 : case T_HashState:
1076 : case T_SortState:
1077 : case T_IncrementalSortState:
1078 : case T_MemoizeState:
1079 : /* these nodes have DSM state, but no reinitialization is required */
1080 120 : break;
1081 :
1082 24 : default:
1083 24 : break;
1084 : }
1085 :
1086 444 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1087 : }
1088 :
1089 : /*
1090 : * Copy instrumentation information about this node and its descendants from
1091 : * dynamic shared memory.
1092 : */
1093 : static bool
1094 684 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1095 : SharedExecutorInstrumentation *instrumentation)
1096 : {
1097 : NodeInstrumentation *instrument;
1098 : int i;
1099 : int n;
1100 : int ibytes;
1101 684 : int plan_node_id = planstate->plan->plan_node_id;
1102 : MemoryContext oldcontext;
1103 :
1104 : /* Find the instrumentation for this node. */
1105 3092 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1106 3092 : if (instrumentation->plan_node_id[i] == plan_node_id)
1107 684 : break;
1108 684 : if (i >= instrumentation->num_plan_nodes)
1109 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1110 :
1111 : /* Accumulate the statistics from all workers. */
1112 684 : instrument = GetInstrumentationArray(instrumentation);
1113 684 : instrument += i * instrumentation->num_workers;
1114 1804 : for (n = 0; n < instrumentation->num_workers; ++n)
1115 1120 : InstrAggNode(planstate->instrument, &instrument[n]);
1116 :
1117 : /*
1118 : * Also store the per-worker detail.
1119 : *
1120 : * Worker instrumentation should be allocated in the same context as the
1121 : * regular instrumentation information, which is the per-query context.
1122 : * Switch into per-query memory context.
1123 : */
1124 684 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1125 684 : ibytes = mul_size(instrumentation->num_workers, sizeof(NodeInstrumentation));
1126 684 : planstate->worker_instrument =
1127 684 : palloc(ibytes + offsetof(WorkerNodeInstrumentation, instrument));
1128 684 : MemoryContextSwitchTo(oldcontext);
1129 :
1130 684 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
1131 684 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1132 :
1133 : /* Perform any node-type-specific work that needs to be done. */
1134 684 : switch (nodeTag(planstate))
1135 : {
1136 180 : case T_IndexScanState:
1137 180 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1138 180 : break;
1139 0 : case T_IndexOnlyScanState:
1140 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1141 0 : break;
1142 0 : case T_BitmapIndexScanState:
1143 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1144 0 : break;
1145 8 : case T_SortState:
1146 8 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1147 8 : break;
1148 0 : case T_IncrementalSortState:
1149 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1150 0 : break;
1151 56 : case T_HashState:
1152 56 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1153 56 : break;
1154 68 : case T_AggState:
1155 68 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1156 68 : break;
1157 0 : case T_MemoizeState:
1158 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1159 0 : break;
1160 0 : case T_BitmapHeapScanState:
1161 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1162 0 : break;
1163 232 : case T_SeqScanState:
1164 232 : ExecSeqScanRetrieveInstrumentation((SeqScanState *) planstate);
1165 232 : break;
1166 0 : case T_TidRangeScanState:
1167 0 : ExecTidRangeScanRetrieveInstrumentation((TidRangeScanState *) planstate);
1168 0 : break;
1169 140 : default:
1170 140 : break;
1171 : }
1172 :
1173 684 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1174 : instrumentation);
1175 : }
1176 :
1177 : /*
1178 : * Add up the workers' JIT instrumentation from dynamic shared memory.
1179 : */
1180 : static void
1181 0 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1182 : SharedJitInstrumentation *shared_jit)
1183 : {
1184 : JitInstrumentation *combined;
1185 : int ibytes;
1186 :
1187 : int n;
1188 :
1189 : /*
1190 : * Accumulate worker JIT instrumentation into the combined JIT
1191 : * instrumentation, allocating it if required.
1192 : */
1193 0 : if (!planstate->state->es_jit_worker_instr)
1194 0 : planstate->state->es_jit_worker_instr =
1195 0 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1196 0 : combined = planstate->state->es_jit_worker_instr;
1197 :
1198 : /* Accumulate all the workers' instrumentations. */
1199 0 : for (n = 0; n < shared_jit->num_workers; ++n)
1200 0 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1201 :
1202 : /*
1203 : * Store the per-worker detail.
1204 : *
1205 : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1206 : * instrumentation in per-query context.
1207 : */
1208 0 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1209 0 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1210 0 : planstate->worker_jit_instrument =
1211 0 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1212 :
1213 0 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1214 0 : }
1215 :
1216 : /*
1217 : * Finish parallel execution. We wait for parallel workers to finish, and
1218 : * accumulate their buffer/WAL usage.
1219 : */
1220 : void
1221 1242 : ExecParallelFinish(ParallelExecutorInfo *pei)
1222 : {
1223 1242 : int nworkers = pei->pcxt->nworkers_launched;
1224 : int i;
1225 :
1226 : /* Make this be a no-op if called twice in a row. */
1227 1242 : if (pei->finished)
1228 555 : return;
1229 :
1230 : /*
1231 : * Detach from tuple queues ASAP, so that any still-active workers will
1232 : * notice that no further results are wanted.
1233 : */
1234 687 : if (pei->tqueue != NULL)
1235 : {
1236 2492 : for (i = 0; i < nworkers; i++)
1237 1805 : shm_mq_detach(pei->tqueue[i]);
1238 687 : pfree(pei->tqueue);
1239 687 : pei->tqueue = NULL;
1240 : }
1241 :
1242 : /*
1243 : * While we're waiting for the workers to finish, let's get rid of the
1244 : * tuple queue readers. (Any other local cleanup could be done here too.)
1245 : */
1246 687 : if (pei->reader != NULL)
1247 : {
1248 2480 : for (i = 0; i < nworkers; i++)
1249 1805 : DestroyTupleQueueReader(pei->reader[i]);
1250 675 : pfree(pei->reader);
1251 675 : pei->reader = NULL;
1252 : }
1253 :
1254 : /* Now wait for the workers to finish. */
1255 687 : WaitForParallelWorkersToFinish(pei->pcxt);
1256 :
1257 : /*
1258 : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1259 : * finish, or we might get incomplete data.)
1260 : */
1261 2492 : for (i = 0; i < nworkers; i++)
1262 1805 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1263 :
1264 687 : pei->finished = true;
1265 : }
1266 :
1267 : /*
1268 : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1269 : * resources still exist after ExecParallelFinish. We separate these
1270 : * routines because someone might want to examine the contents of the DSM
1271 : * after ExecParallelFinish and before calling this routine.
1272 : */
1273 : void
1274 515 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1275 : {
1276 : /* Accumulate instrumentation, if any. */
1277 515 : if (pei->instrumentation)
1278 120 : ExecParallelRetrieveInstrumentation(pei->planstate,
1279 : pei->instrumentation);
1280 :
1281 : /* Accumulate JIT instrumentation, if any. */
1282 515 : if (pei->jit_instrumentation)
1283 0 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1284 0 : pei->jit_instrumentation);
1285 :
1286 : /* Free any serialized parameters. */
1287 515 : if (DsaPointerIsValid(pei->param_exec))
1288 : {
1289 16 : dsa_free(pei->area, pei->param_exec);
1290 16 : pei->param_exec = InvalidDsaPointer;
1291 : }
1292 515 : if (pei->area != NULL)
1293 : {
1294 515 : dsa_detach(pei->area);
1295 515 : pei->area = NULL;
1296 : }
1297 515 : if (pei->pcxt != NULL)
1298 : {
1299 515 : DestroyParallelContext(pei->pcxt);
1300 515 : pei->pcxt = NULL;
1301 : }
1302 515 : pfree(pei);
1303 515 : }
1304 :
1305 : /*
1306 : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1307 : * for that purpose.
1308 : */
1309 : static DestReceiver *
1310 1813 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1311 : {
1312 : char *mqspace;
1313 : shm_mq *mq;
1314 :
1315 1813 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1316 1813 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1317 1813 : mq = (shm_mq *) mqspace;
1318 1813 : shm_mq_set_sender(mq, MyProc);
1319 1813 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1320 : }
1321 :
1322 : /*
1323 : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1324 : */
1325 : static QueryDesc *
1326 1813 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1327 : int instrument_options)
1328 : {
1329 : char *pstmtspace;
1330 : char *paramspace;
1331 : PlannedStmt *pstmt;
1332 : ParamListInfo paramLI;
1333 : char *queryString;
1334 :
1335 : /* Get the query string from shared memory */
1336 1813 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1337 :
1338 : /* Reconstruct leader-supplied PlannedStmt. */
1339 1813 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1340 1813 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1341 :
1342 : /* Reconstruct ParamListInfo. */
1343 1813 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1344 1813 : paramLI = RestoreParamList(¶mspace);
1345 :
1346 : /* Create a QueryDesc for the query. */
1347 1813 : return CreateQueryDesc(pstmt,
1348 : queryString,
1349 : GetActiveSnapshot(), InvalidSnapshot,
1350 : receiver, paramLI, NULL, instrument_options);
1351 : }
1352 :
1353 : /*
1354 : * Copy instrumentation information from this node and its descendants into
1355 : * dynamic shared memory, so that the parallel leader can retrieve it.
1356 : */
1357 : static bool
1358 1579 : ExecParallelReportInstrumentation(PlanState *planstate,
1359 : SharedExecutorInstrumentation *instrumentation)
1360 : {
1361 : int i;
1362 1579 : int plan_node_id = planstate->plan->plan_node_id;
1363 : NodeInstrumentation *instrument;
1364 :
1365 1579 : InstrEndLoop(planstate->instrument);
1366 :
1367 : /*
1368 : * If we shuffled the plan_node_id values in ps_instrument into sorted
1369 : * order, we could use binary search here. This might matter someday if
1370 : * we're pushing down sufficiently large plan trees. For now, do it the
1371 : * slow, dumb way.
1372 : */
1373 5193 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1374 5193 : if (instrumentation->plan_node_id[i] == plan_node_id)
1375 1579 : break;
1376 1579 : if (i >= instrumentation->num_plan_nodes)
1377 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1378 :
1379 : /*
1380 : * Add our statistics to the per-node, per-worker totals. It's possible
1381 : * that this could happen more than once if we relaunched workers.
1382 : */
1383 1579 : instrument = GetInstrumentationArray(instrumentation);
1384 1579 : instrument += i * instrumentation->num_workers;
1385 : Assert(IsParallelWorker());
1386 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1387 1579 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1388 :
1389 1579 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1390 : instrumentation);
1391 : }
1392 :
1393 : /*
1394 : * Initialize the PlanState and its descendants with the information
1395 : * retrieved from shared memory. This has to be done once the PlanState
1396 : * is allocated and initialized by executor; that is, after ExecutorStart().
1397 : */
1398 : static bool
1399 8156 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1400 : {
1401 8156 : if (planstate == NULL)
1402 0 : return false;
1403 :
1404 8156 : switch (nodeTag(planstate))
1405 : {
1406 3744 : case T_SeqScanState:
1407 3744 : if (planstate->plan->parallel_aware)
1408 2966 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1409 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1410 3744 : ExecSeqScanInstrumentInitWorker((SeqScanState *) planstate, pwcxt);
1411 3744 : break;
1412 424 : case T_IndexScanState:
1413 424 : if (planstate->plan->parallel_aware)
1414 80 : ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1415 : pwcxt);
1416 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1417 424 : ExecIndexScanInstrumentInitWorker((IndexScanState *) planstate,
1418 : pwcxt);
1419 424 : break;
1420 168 : case T_IndexOnlyScanState:
1421 168 : if (planstate->plan->parallel_aware)
1422 136 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1423 : pwcxt);
1424 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1425 168 : ExecIndexOnlyScanInstrumentInitWorker((IndexOnlyScanState *) planstate,
1426 : pwcxt);
1427 168 : break;
1428 181 : case T_BitmapIndexScanState:
1429 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1430 181 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1431 : pwcxt);
1432 181 : break;
1433 0 : case T_ForeignScanState:
1434 0 : if (planstate->plan->parallel_aware)
1435 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1436 : pwcxt);
1437 0 : break;
1438 64 : case T_TidRangeScanState:
1439 64 : if (planstate->plan->parallel_aware)
1440 64 : ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1441 : pwcxt);
1442 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1443 64 : ExecTidRangeScanInstrumentInitWorker((TidRangeScanState *) planstate,
1444 : pwcxt);
1445 64 : break;
1446 297 : case T_AppendState:
1447 297 : if (planstate->plan->parallel_aware)
1448 241 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1449 297 : break;
1450 0 : case T_CustomScanState:
1451 0 : if (planstate->plan->parallel_aware)
1452 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1453 : pwcxt);
1454 0 : break;
1455 181 : case T_BitmapHeapScanState:
1456 181 : if (planstate->plan->parallel_aware)
1457 180 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1458 : pwcxt);
1459 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1460 181 : ExecBitmapHeapInstrumentInitWorker((BitmapHeapScanState *) planstate,
1461 : pwcxt);
1462 181 : break;
1463 524 : case T_HashJoinState:
1464 524 : if (planstate->plan->parallel_aware)
1465 212 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1466 : pwcxt);
1467 524 : break;
1468 524 : case T_HashState:
1469 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1470 524 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1471 524 : break;
1472 499 : case T_SortState:
1473 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1474 499 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1475 499 : break;
1476 0 : case T_IncrementalSortState:
1477 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1478 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1479 : pwcxt);
1480 0 : break;
1481 1112 : case T_AggState:
1482 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1483 1112 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1484 1112 : break;
1485 8 : case T_MemoizeState:
1486 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1487 8 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1488 8 : break;
1489 430 : default:
1490 430 : break;
1491 : }
1492 :
1493 8156 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1494 : pwcxt);
1495 : }
1496 :
1497 : /*
1498 : * Main entrypoint for parallel query worker processes.
1499 : *
1500 : * We reach this function from ParallelWorkerMain, so the setup necessary to
1501 : * create a sensible parallel environment has already been done;
1502 : * ParallelWorkerMain worries about stuff like the transaction state, combo
1503 : * CID mappings, and GUC values, so we don't need to deal with any of that
1504 : * here.
1505 : *
1506 : * Our job is to deal with concerns specific to the executor. The parallel
1507 : * group leader will have stored a serialized PlannedStmt, and it's our job
1508 : * to execute that plan and write the resulting tuples to the appropriate
1509 : * tuple queue. Various bits of supporting information that we need in order
1510 : * to do this are also stored in the dsm_segment and can be accessed through
1511 : * the shm_toc.
1512 : */
1513 : void
1514 1813 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1515 : {
1516 : FixedParallelExecutorState *fpes;
1517 : BufferUsage *buffer_usage;
1518 : WalUsage *wal_usage;
1519 : DestReceiver *receiver;
1520 : QueryDesc *queryDesc;
1521 : SharedExecutorInstrumentation *instrumentation;
1522 : SharedJitInstrumentation *jit_instrumentation;
1523 1813 : int instrument_options = 0;
1524 : void *area_space;
1525 : dsa_area *area;
1526 : ParallelWorkerContext pwcxt;
1527 :
1528 : /* Get fixed-size state. */
1529 1813 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1530 :
1531 : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1532 1813 : receiver = ExecParallelGetReceiver(seg, toc);
1533 1813 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1534 1813 : if (instrumentation != NULL)
1535 483 : instrument_options = instrumentation->instrument_options;
1536 1813 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1537 : true);
1538 1813 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1539 :
1540 : /* Setting debug_query_string for individual workers */
1541 1813 : debug_query_string = queryDesc->sourceText;
1542 :
1543 : /* Report workers' query for monitoring purposes */
1544 1813 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1545 :
1546 : /* Attach to the dynamic shared memory area. */
1547 1813 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1548 1813 : area = dsa_attach_in_place(area_space, seg);
1549 :
1550 : /* Start up the executor */
1551 1813 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1552 1813 : ExecutorStart(queryDesc, fpes->eflags);
1553 :
1554 : /* Special executor initialization steps for parallel workers */
1555 1813 : queryDesc->planstate->state->es_query_dsa = area;
1556 1813 : if (DsaPointerIsValid(fpes->param_exec))
1557 : {
1558 : char *paramexec_space;
1559 :
1560 48 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1561 48 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1562 : }
1563 1813 : pwcxt.toc = toc;
1564 1813 : pwcxt.seg = seg;
1565 1813 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1566 :
1567 : /* Pass down any tuple bound */
1568 1813 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1569 :
1570 : /*
1571 : * Prepare to track buffer/WAL usage during query execution.
1572 : *
1573 : * We do this after starting up the executor to match what happens in the
1574 : * leader, which also doesn't count buffer accesses and WAL activity that
1575 : * occur during executor startup.
1576 : */
1577 1813 : InstrStartParallelQuery();
1578 :
1579 : /*
1580 : * Run the plan. If we specified a tuple bound, be careful not to demand
1581 : * more tuples than that.
1582 : */
1583 1813 : ExecutorRun(queryDesc,
1584 : ForwardScanDirection,
1585 1813 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1586 :
1587 : /* Shut down the executor */
1588 1805 : ExecutorFinish(queryDesc);
1589 :
1590 : /* Report buffer/WAL usage during parallel execution. */
1591 1805 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1592 1805 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1593 1805 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1594 1805 : &wal_usage[ParallelWorkerNumber]);
1595 :
1596 : /* Report instrumentation data if any instrumentation options are set. */
1597 1805 : if (instrumentation != NULL)
1598 483 : ExecParallelReportInstrumentation(queryDesc->planstate,
1599 : instrumentation);
1600 :
1601 : /* Report JIT instrumentation data if any */
1602 1805 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1603 : {
1604 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1605 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1606 0 : queryDesc->estate->es_jit->instr;
1607 : }
1608 :
1609 : /* Must do this after capturing instrumentation. */
1610 1805 : ExecutorEnd(queryDesc);
1611 :
1612 : /* Cleanup. */
1613 1805 : dsa_detach(area);
1614 1805 : FreeQueryDesc(queryDesc);
1615 1805 : receiver->rDestroy(receiver);
1616 1805 : }
|