Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execParallel.c
4 : * Support routines for parallel execution.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * This file contains routines that are intended to support setting up,
10 : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : * executor. The ParallelContext machinery will handle starting the
12 : * workers and ensuring that their state generally matches that of the
13 : * leader; see src/backend/access/transam/README.parallel for details.
14 : * However, we must save and restore relevant executor state, such as
15 : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : * the actual plan to be passed down to the worker.
17 : *
18 : * IDENTIFICATION
19 : * src/backend/executor/execParallel.c
20 : *
21 : *-------------------------------------------------------------------------
22 : */
23 :
24 : #include "postgres.h"
25 :
26 : #include "executor/execParallel.h"
27 : #include "executor/executor.h"
28 : #include "executor/nodeAgg.h"
29 : #include "executor/nodeAppend.h"
30 : #include "executor/nodeBitmapHeapscan.h"
31 : #include "executor/nodeBitmapIndexscan.h"
32 : #include "executor/nodeCustom.h"
33 : #include "executor/nodeForeignscan.h"
34 : #include "executor/nodeHash.h"
35 : #include "executor/nodeHashjoin.h"
36 : #include "executor/nodeIncrementalSort.h"
37 : #include "executor/nodeIndexonlyscan.h"
38 : #include "executor/nodeIndexscan.h"
39 : #include "executor/nodeMemoize.h"
40 : #include "executor/nodeSeqscan.h"
41 : #include "executor/nodeSort.h"
42 : #include "executor/nodeSubplan.h"
43 : #include "executor/nodeTidrangescan.h"
44 : #include "executor/tqueue.h"
45 : #include "jit/jit.h"
46 : #include "nodes/nodeFuncs.h"
47 : #include "pgstat.h"
48 : #include "storage/proc.h"
49 : #include "tcop/tcopprot.h"
50 : #include "utils/datum.h"
51 : #include "utils/dsa.h"
52 : #include "utils/lsyscache.h"
53 : #include "utils/snapmgr.h"
54 :
55 : /*
56 : * Magic numbers for parallel executor communication. We use constants
57 : * greater than any 32-bit integer here so that values < 2^32 can be used
58 : * by individual parallel nodes to store their own state.
59 : */
60 : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
61 : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
62 : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
63 : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
64 : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
65 : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
66 : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
67 : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
68 : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
69 : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
70 :
71 : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
72 :
73 : /*
74 : * Fixed-size random stuff that we need to pass to parallel workers.
75 : */
76 : typedef struct FixedParallelExecutorState
77 : {
78 : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
79 : dsa_pointer param_exec;
80 : int eflags;
81 : int jit_flags;
82 : } FixedParallelExecutorState;
83 :
84 : /*
85 : * DSM structure for accumulating per-PlanState instrumentation.
86 : *
87 : * instrument_options: Same meaning here as in instrument.c.
88 : *
89 : * instrument_offset: Offset, relative to the start of this structure,
90 : * of the first Instrumentation object. This will depend on the length of
91 : * the plan_node_id array.
92 : *
93 : * num_workers: Number of workers.
94 : *
95 : * num_plan_nodes: Number of plan nodes.
96 : *
97 : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
98 : * from parallel workers. The length of this array is given by num_plan_nodes.
99 : */
100 : struct SharedExecutorInstrumentation
101 : {
102 : int instrument_options;
103 : int instrument_offset;
104 : int num_workers;
105 : int num_plan_nodes;
106 : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
107 : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
108 : };
109 : #define GetInstrumentationArray(sei) \
110 : (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
111 : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
112 :
113 : /* Context object for ExecParallelEstimate. */
114 : typedef struct ExecParallelEstimateContext
115 : {
116 : ParallelContext *pcxt;
117 : int nnodes;
118 : } ExecParallelEstimateContext;
119 :
120 : /* Context object for ExecParallelInitializeDSM. */
121 : typedef struct ExecParallelInitializeDSMContext
122 : {
123 : ParallelContext *pcxt;
124 : SharedExecutorInstrumentation *instrumentation;
125 : int nnodes;
126 : } ExecParallelInitializeDSMContext;
127 :
128 : /* Helper functions that run in the parallel leader. */
129 : static char *ExecSerializePlan(Plan *plan, EState *estate);
130 : static bool ExecParallelEstimate(PlanState *planstate,
131 : ExecParallelEstimateContext *e);
132 : static bool ExecParallelInitializeDSM(PlanState *planstate,
133 : ExecParallelInitializeDSMContext *d);
134 : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
135 : bool reinitialize);
136 : static bool ExecParallelReInitializeDSM(PlanState *planstate,
137 : ParallelContext *pcxt);
138 : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
139 : SharedExecutorInstrumentation *instrumentation);
140 :
141 : /* Helper function that runs in the parallel worker. */
142 : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
143 :
144 : /*
145 : * Create a serialized representation of the plan to be sent to each worker.
146 : */
147 : static char *
148 522 : ExecSerializePlan(Plan *plan, EState *estate)
149 : {
150 : PlannedStmt *pstmt;
151 : ListCell *lc;
152 :
153 : /* We can't scribble on the original plan, so make a copy. */
154 522 : plan = copyObject(plan);
155 :
156 : /*
157 : * The worker will start its own copy of the executor, and that copy will
158 : * insert a junk filter if the toplevel node has any resjunk entries. We
159 : * don't want that to happen, because while resjunk columns shouldn't be
160 : * sent back to the user, here the tuples are coming back to another
161 : * backend which may very well need them. So mutate the target list
162 : * accordingly. This is sort of a hack; there might be better ways to do
163 : * this...
164 : */
165 1437 : foreach(lc, plan->targetlist)
166 : {
167 915 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
168 :
169 915 : tle->resjunk = false;
170 : }
171 :
172 : /*
173 : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
174 : * for our purposes, but the worker will need at least a minimal
175 : * PlannedStmt to start the executor.
176 : */
177 522 : pstmt = makeNode(PlannedStmt);
178 522 : pstmt->commandType = CMD_SELECT;
179 522 : pstmt->queryId = pgstat_get_my_query_id();
180 522 : pstmt->planId = pgstat_get_my_plan_id();
181 522 : pstmt->hasReturning = false;
182 522 : pstmt->hasModifyingCTE = false;
183 522 : pstmt->canSetTag = true;
184 522 : pstmt->transientPlan = false;
185 522 : pstmt->dependsOnRole = false;
186 522 : pstmt->parallelModeNeeded = false;
187 522 : pstmt->planTree = plan;
188 522 : pstmt->partPruneInfos = estate->es_part_prune_infos;
189 522 : pstmt->rtable = estate->es_range_table;
190 522 : pstmt->unprunableRelids = estate->es_unpruned_relids;
191 522 : pstmt->permInfos = estate->es_rteperminfos;
192 522 : pstmt->resultRelations = NIL;
193 522 : pstmt->appendRelations = NIL;
194 522 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
195 :
196 : /*
197 : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
198 : * for unsafe ones (so that the list indexes of the safe ones are
199 : * preserved). This positively ensures that the worker won't try to run,
200 : * or even do ExecInitNode on, an unsafe subplan. That's important to
201 : * protect, eg, non-parallel-aware FDWs from getting into trouble.
202 : */
203 522 : pstmt->subplans = NIL;
204 558 : foreach(lc, estate->es_plannedstmt->subplans)
205 : {
206 36 : Plan *subplan = (Plan *) lfirst(lc);
207 :
208 36 : if (subplan && !subplan->parallel_safe)
209 8 : subplan = NULL;
210 36 : pstmt->subplans = lappend(pstmt->subplans, subplan);
211 : }
212 :
213 522 : pstmt->rewindPlanIDs = NULL;
214 522 : pstmt->rowMarks = NIL;
215 522 : pstmt->relationOids = NIL;
216 522 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
217 522 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
218 522 : pstmt->utilityStmt = NULL;
219 522 : pstmt->stmt_location = -1;
220 522 : pstmt->stmt_len = -1;
221 :
222 : /* Return serialized copy of our dummy PlannedStmt. */
223 522 : return nodeToString(pstmt);
224 : }
225 :
226 : /*
227 : * Parallel-aware plan nodes (and occasionally others) may need some state
228 : * which is shared across all parallel workers. Before we size the DSM, give
229 : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
230 : * &pcxt->estimator.
231 : *
232 : * While we're at it, count the number of PlanState nodes in the tree, so
233 : * we know how many Instrumentation structures we need.
234 : */
235 : static bool
236 3045 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
237 : {
238 3045 : if (planstate == NULL)
239 0 : return false;
240 :
241 : /* Count this node. */
242 3045 : e->nnodes++;
243 :
244 3045 : switch (nodeTag(planstate))
245 : {
246 1271 : case T_SeqScanState:
247 1271 : if (planstate->plan->parallel_aware)
248 933 : ExecSeqScanEstimate((SeqScanState *) planstate,
249 : e->pcxt);
250 1271 : break;
251 276 : case T_IndexScanState:
252 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
253 276 : ExecIndexScanEstimate((IndexScanState *) planstate,
254 : e->pcxt);
255 276 : break;
256 42 : case T_IndexOnlyScanState:
257 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
258 42 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
259 : e->pcxt);
260 42 : break;
261 13 : case T_BitmapIndexScanState:
262 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
263 13 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
264 : e->pcxt);
265 13 : break;
266 0 : case T_ForeignScanState:
267 0 : if (planstate->plan->parallel_aware)
268 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
269 : e->pcxt);
270 0 : break;
271 16 : case T_TidRangeScanState:
272 16 : if (planstate->plan->parallel_aware)
273 16 : ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
274 : e->pcxt);
275 16 : break;
276 148 : case T_AppendState:
277 148 : if (planstate->plan->parallel_aware)
278 108 : ExecAppendEstimate((AppendState *) planstate,
279 : e->pcxt);
280 148 : break;
281 0 : case T_CustomScanState:
282 0 : if (planstate->plan->parallel_aware)
283 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
284 : e->pcxt);
285 0 : break;
286 13 : case T_BitmapHeapScanState:
287 13 : if (planstate->plan->parallel_aware)
288 12 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
289 : e->pcxt);
290 13 : break;
291 208 : case T_HashJoinState:
292 208 : if (planstate->plan->parallel_aware)
293 84 : ExecHashJoinEstimate((HashJoinState *) planstate,
294 : e->pcxt);
295 208 : break;
296 208 : case T_HashState:
297 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
298 208 : ExecHashEstimate((HashState *) planstate, e->pcxt);
299 208 : break;
300 201 : case T_SortState:
301 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
302 201 : ExecSortEstimate((SortState *) planstate, e->pcxt);
303 201 : break;
304 0 : case T_IncrementalSortState:
305 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
306 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
307 0 : break;
308 395 : case T_AggState:
309 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
310 395 : ExecAggEstimate((AggState *) planstate, e->pcxt);
311 395 : break;
312 4 : case T_MemoizeState:
313 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
314 4 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
315 4 : break;
316 250 : default:
317 250 : break;
318 : }
319 :
320 3045 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
321 : }
322 :
323 : /*
324 : * Estimate the amount of space required to serialize the indicated parameters.
325 : */
326 : static Size
327 16 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
328 : {
329 : int paramid;
330 16 : Size sz = sizeof(int);
331 :
332 16 : paramid = -1;
333 36 : while ((paramid = bms_next_member(params, paramid)) >= 0)
334 : {
335 : Oid typeOid;
336 : int16 typLen;
337 : bool typByVal;
338 : ParamExecData *prm;
339 :
340 20 : prm = &(estate->es_param_exec_vals[paramid]);
341 20 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
342 : paramid);
343 :
344 20 : sz = add_size(sz, sizeof(int)); /* space for paramid */
345 :
346 : /* space for datum/isnull */
347 20 : if (OidIsValid(typeOid))
348 20 : get_typlenbyval(typeOid, &typLen, &typByVal);
349 : else
350 : {
351 : /* If no type OID, assume by-value, like copyParamList does. */
352 0 : typLen = sizeof(Datum);
353 0 : typByVal = true;
354 : }
355 20 : sz = add_size(sz,
356 20 : datumEstimateSpace(prm->value, prm->isnull,
357 : typByVal, typLen));
358 : }
359 16 : return sz;
360 : }
361 :
362 : /*
363 : * Serialize specified PARAM_EXEC parameters.
364 : *
365 : * We write the number of parameters first, as a 4-byte integer, and then
366 : * write details for each parameter in turn. The details for each parameter
367 : * consist of a 4-byte paramid (location of param in execution time internal
368 : * parameter array) and then the datum as serialized by datumSerialize().
369 : */
370 : static dsa_pointer
371 16 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
372 : {
373 : Size size;
374 : int nparams;
375 : int paramid;
376 : ParamExecData *prm;
377 : dsa_pointer handle;
378 : char *start_address;
379 :
380 : /* Allocate enough space for the current parameter values. */
381 16 : size = EstimateParamExecSpace(estate, params);
382 16 : handle = dsa_allocate(area, size);
383 16 : start_address = dsa_get_address(area, handle);
384 :
385 : /* First write the number of parameters as a 4-byte integer. */
386 16 : nparams = bms_num_members(params);
387 16 : memcpy(start_address, &nparams, sizeof(int));
388 16 : start_address += sizeof(int);
389 :
390 : /* Write details for each parameter in turn. */
391 16 : paramid = -1;
392 36 : while ((paramid = bms_next_member(params, paramid)) >= 0)
393 : {
394 : Oid typeOid;
395 : int16 typLen;
396 : bool typByVal;
397 :
398 20 : prm = &(estate->es_param_exec_vals[paramid]);
399 20 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
400 : paramid);
401 :
402 : /* Write paramid. */
403 20 : memcpy(start_address, ¶mid, sizeof(int));
404 20 : start_address += sizeof(int);
405 :
406 : /* Write datum/isnull */
407 20 : if (OidIsValid(typeOid))
408 20 : get_typlenbyval(typeOid, &typLen, &typByVal);
409 : else
410 : {
411 : /* If no type OID, assume by-value, like copyParamList does. */
412 0 : typLen = sizeof(Datum);
413 0 : typByVal = true;
414 : }
415 20 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
416 : &start_address);
417 : }
418 :
419 16 : return handle;
420 : }
421 :
422 : /*
423 : * Restore specified PARAM_EXEC parameters.
424 : */
425 : static void
426 48 : RestoreParamExecParams(char *start_address, EState *estate)
427 : {
428 : int nparams;
429 : int i;
430 : int paramid;
431 :
432 48 : memcpy(&nparams, start_address, sizeof(int));
433 48 : start_address += sizeof(int);
434 :
435 104 : for (i = 0; i < nparams; i++)
436 : {
437 : ParamExecData *prm;
438 :
439 : /* Read paramid */
440 56 : memcpy(¶mid, start_address, sizeof(int));
441 56 : start_address += sizeof(int);
442 56 : prm = &(estate->es_param_exec_vals[paramid]);
443 :
444 : /* Read datum/isnull. */
445 56 : prm->value = datumRestore(&start_address, &prm->isnull);
446 56 : prm->execPlan = NULL;
447 : }
448 48 : }
449 :
450 : /*
451 : * Initialize the dynamic shared memory segment that will be used to control
452 : * parallel execution.
453 : */
454 : static bool
455 3045 : ExecParallelInitializeDSM(PlanState *planstate,
456 : ExecParallelInitializeDSMContext *d)
457 : {
458 3045 : if (planstate == NULL)
459 0 : return false;
460 :
461 : /* If instrumentation is enabled, initialize slot for this node. */
462 3045 : if (d->instrumentation != NULL)
463 684 : d->instrumentation->plan_node_id[d->nnodes] =
464 684 : planstate->plan->plan_node_id;
465 :
466 : /* Count this node. */
467 3045 : d->nnodes++;
468 :
469 : /*
470 : * Call initializers for DSM-using plan nodes.
471 : *
472 : * Most plan nodes won't do anything here, but plan nodes that allocated
473 : * DSM may need to initialize shared state in the DSM before parallel
474 : * workers are launched. They can allocate the space they previously
475 : * estimated using shm_toc_allocate, and add the keys they previously
476 : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
477 : */
478 3045 : switch (nodeTag(planstate))
479 : {
480 1271 : case T_SeqScanState:
481 1271 : if (planstate->plan->parallel_aware)
482 933 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
483 : d->pcxt);
484 1271 : break;
485 276 : case T_IndexScanState:
486 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
487 276 : ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
488 276 : break;
489 42 : case T_IndexOnlyScanState:
490 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
491 42 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
492 : d->pcxt);
493 42 : break;
494 13 : case T_BitmapIndexScanState:
495 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
496 13 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
497 13 : break;
498 0 : case T_ForeignScanState:
499 0 : if (planstate->plan->parallel_aware)
500 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
501 : d->pcxt);
502 0 : break;
503 16 : case T_TidRangeScanState:
504 16 : if (planstate->plan->parallel_aware)
505 16 : ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
506 : d->pcxt);
507 16 : break;
508 148 : case T_AppendState:
509 148 : if (planstate->plan->parallel_aware)
510 108 : ExecAppendInitializeDSM((AppendState *) planstate,
511 : d->pcxt);
512 148 : break;
513 0 : case T_CustomScanState:
514 0 : if (planstate->plan->parallel_aware)
515 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
516 : d->pcxt);
517 0 : break;
518 13 : case T_BitmapHeapScanState:
519 13 : if (planstate->plan->parallel_aware)
520 12 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
521 : d->pcxt);
522 13 : break;
523 208 : case T_HashJoinState:
524 208 : if (planstate->plan->parallel_aware)
525 84 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
526 : d->pcxt);
527 208 : break;
528 208 : case T_HashState:
529 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
530 208 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
531 208 : break;
532 201 : case T_SortState:
533 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
534 201 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
535 201 : break;
536 0 : case T_IncrementalSortState:
537 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
538 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
539 0 : break;
540 395 : case T_AggState:
541 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
542 395 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
543 395 : break;
544 4 : case T_MemoizeState:
545 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
546 4 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
547 4 : break;
548 250 : default:
549 250 : break;
550 : }
551 :
552 3045 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
553 : }
554 :
555 : /*
556 : * It sets up the response queues for backend workers to return tuples
557 : * to the main backend and start the workers.
558 : */
559 : static shm_mq_handle **
560 694 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
561 : {
562 : shm_mq_handle **responseq;
563 : char *tqueuespace;
564 : int i;
565 :
566 : /* Skip this if no workers. */
567 694 : if (pcxt->nworkers == 0)
568 0 : return NULL;
569 :
570 : /* Allocate memory for shared memory queue handles. */
571 : responseq = (shm_mq_handle **)
572 694 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
573 :
574 : /*
575 : * If not reinitializing, allocate space from the DSM for the queues;
576 : * otherwise, find the already allocated space.
577 : */
578 694 : if (!reinitialize)
579 : tqueuespace =
580 522 : shm_toc_allocate(pcxt->toc,
581 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
582 522 : pcxt->nworkers));
583 : else
584 172 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
585 :
586 : /* Create the queues, and become the receiver for each. */
587 2562 : for (i = 0; i < pcxt->nworkers; ++i)
588 : {
589 : shm_mq *mq;
590 :
591 1868 : mq = shm_mq_create(tqueuespace +
592 1868 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
593 : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
594 :
595 1868 : shm_mq_set_receiver(mq, MyProc);
596 1868 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
597 : }
598 :
599 : /* Add array of queues to shm_toc, so others can find it. */
600 694 : if (!reinitialize)
601 522 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
602 :
603 : /* Return array of handles. */
604 694 : return responseq;
605 : }
606 :
607 : /*
608 : * Sets up the required infrastructure for backend workers to perform
609 : * execution and return results to the main backend.
610 : */
611 : ParallelExecutorInfo *
612 522 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
613 : Bitmapset *sendParams, int nworkers,
614 : int64 tuples_needed)
615 : {
616 : ParallelExecutorInfo *pei;
617 : ParallelContext *pcxt;
618 : ExecParallelEstimateContext e;
619 : ExecParallelInitializeDSMContext d;
620 : FixedParallelExecutorState *fpes;
621 : char *pstmt_data;
622 : char *pstmt_space;
623 : char *paramlistinfo_space;
624 : BufferUsage *bufusage_space;
625 : WalUsage *walusage_space;
626 522 : SharedExecutorInstrumentation *instrumentation = NULL;
627 522 : SharedJitInstrumentation *jit_instrumentation = NULL;
628 : int pstmt_len;
629 : int paramlistinfo_len;
630 522 : int instrumentation_len = 0;
631 522 : int jit_instrumentation_len = 0;
632 522 : int instrument_offset = 0;
633 522 : Size dsa_minsize = dsa_minimum_size();
634 : char *query_string;
635 : int query_len;
636 :
637 : /*
638 : * Force any initplan outputs that we're going to pass to workers to be
639 : * evaluated, if they weren't already.
640 : *
641 : * For simplicity, we use the EState's per-output-tuple ExprContext here.
642 : * That risks intra-query memory leakage, since we might pass through here
643 : * many times before that ExprContext gets reset; but ExecSetParamPlan
644 : * doesn't normally leak any memory in the context (see its comments), so
645 : * it doesn't seem worth complicating this function's API to pass it a
646 : * shorter-lived ExprContext. This might need to change someday.
647 : */
648 522 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
649 :
650 : /* Allocate object for return value. */
651 522 : pei = palloc0_object(ParallelExecutorInfo);
652 522 : pei->finished = false;
653 522 : pei->planstate = planstate;
654 :
655 : /* Fix up and serialize plan to be sent to workers. */
656 522 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
657 :
658 : /* Create a parallel context. */
659 522 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
660 522 : pei->pcxt = pcxt;
661 :
662 : /*
663 : * Before telling the parallel context to create a dynamic shared memory
664 : * segment, we need to figure out how big it should be. Estimate space
665 : * for the various things we need to store.
666 : */
667 :
668 : /* Estimate space for fixed-size state. */
669 522 : shm_toc_estimate_chunk(&pcxt->estimator,
670 : sizeof(FixedParallelExecutorState));
671 522 : shm_toc_estimate_keys(&pcxt->estimator, 1);
672 :
673 : /* Estimate space for query text. */
674 522 : query_len = strlen(estate->es_sourceText);
675 522 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
676 522 : shm_toc_estimate_keys(&pcxt->estimator, 1);
677 :
678 : /* Estimate space for serialized PlannedStmt. */
679 522 : pstmt_len = strlen(pstmt_data) + 1;
680 522 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
681 522 : shm_toc_estimate_keys(&pcxt->estimator, 1);
682 :
683 : /* Estimate space for serialized ParamListInfo. */
684 522 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
685 522 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
686 522 : shm_toc_estimate_keys(&pcxt->estimator, 1);
687 :
688 : /*
689 : * Estimate space for BufferUsage.
690 : *
691 : * If EXPLAIN is not in use and there are no extensions loaded that care,
692 : * we could skip this. But we have no way of knowing whether anyone's
693 : * looking at pgBufferUsage, so do it unconditionally.
694 : */
695 522 : shm_toc_estimate_chunk(&pcxt->estimator,
696 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
697 522 : shm_toc_estimate_keys(&pcxt->estimator, 1);
698 :
699 : /*
700 : * Same thing for WalUsage.
701 : */
702 522 : shm_toc_estimate_chunk(&pcxt->estimator,
703 : mul_size(sizeof(WalUsage), pcxt->nworkers));
704 522 : shm_toc_estimate_keys(&pcxt->estimator, 1);
705 :
706 : /* Estimate space for tuple queues. */
707 522 : shm_toc_estimate_chunk(&pcxt->estimator,
708 : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
709 522 : shm_toc_estimate_keys(&pcxt->estimator, 1);
710 :
711 : /*
712 : * Give parallel-aware nodes a chance to add to the estimates, and get a
713 : * count of how many PlanState nodes there are.
714 : */
715 522 : e.pcxt = pcxt;
716 522 : e.nnodes = 0;
717 522 : ExecParallelEstimate(planstate, &e);
718 :
719 : /* Estimate space for instrumentation, if required. */
720 522 : if (estate->es_instrument)
721 : {
722 120 : instrumentation_len =
723 : offsetof(SharedExecutorInstrumentation, plan_node_id) +
724 120 : sizeof(int) * e.nnodes;
725 120 : instrumentation_len = MAXALIGN(instrumentation_len);
726 120 : instrument_offset = instrumentation_len;
727 120 : instrumentation_len +=
728 120 : mul_size(sizeof(Instrumentation),
729 120 : mul_size(e.nnodes, nworkers));
730 120 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
731 120 : shm_toc_estimate_keys(&pcxt->estimator, 1);
732 :
733 : /* Estimate space for JIT instrumentation, if required. */
734 120 : if (estate->es_jit_flags != PGJIT_NONE)
735 : {
736 0 : jit_instrumentation_len =
737 0 : offsetof(SharedJitInstrumentation, jit_instr) +
738 : sizeof(JitInstrumentation) * nworkers;
739 0 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
740 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
741 : }
742 : }
743 :
744 : /* Estimate space for DSA area. */
745 522 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
746 522 : shm_toc_estimate_keys(&pcxt->estimator, 1);
747 :
748 : /*
749 : * InitializeParallelDSM() passes the active snapshot to the parallel
750 : * worker, which uses it to set es_snapshot. Make sure we don't set
751 : * es_snapshot differently in the child.
752 : */
753 : Assert(GetActiveSnapshot() == estate->es_snapshot);
754 :
755 : /* Everyone's had a chance to ask for space, so now create the DSM. */
756 522 : InitializeParallelDSM(pcxt);
757 :
758 : /*
759 : * OK, now we have a dynamic shared memory segment, and it should be big
760 : * enough to store all of the data we estimated we would want to put into
761 : * it, plus whatever general stuff (not specifically executor-related) the
762 : * ParallelContext itself needs to store there. None of the space we
763 : * asked for has been allocated or initialized yet, though, so do that.
764 : */
765 :
766 : /* Store fixed-size state. */
767 522 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
768 522 : fpes->tuples_needed = tuples_needed;
769 522 : fpes->param_exec = InvalidDsaPointer;
770 522 : fpes->eflags = estate->es_top_eflags;
771 522 : fpes->jit_flags = estate->es_jit_flags;
772 522 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
773 :
774 : /* Store query string */
775 522 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
776 522 : memcpy(query_string, estate->es_sourceText, query_len + 1);
777 522 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
778 :
779 : /* Store serialized PlannedStmt. */
780 522 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
781 522 : memcpy(pstmt_space, pstmt_data, pstmt_len);
782 522 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
783 :
784 : /* Store serialized ParamListInfo. */
785 522 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
786 522 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
787 522 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
788 :
789 : /* Allocate space for each worker's BufferUsage; no need to initialize. */
790 522 : bufusage_space = shm_toc_allocate(pcxt->toc,
791 522 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
792 522 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
793 522 : pei->buffer_usage = bufusage_space;
794 :
795 : /* Same for WalUsage. */
796 522 : walusage_space = shm_toc_allocate(pcxt->toc,
797 522 : mul_size(sizeof(WalUsage), pcxt->nworkers));
798 522 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
799 522 : pei->wal_usage = walusage_space;
800 :
801 : /* Set up the tuple queues that the workers will write into. */
802 522 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
803 :
804 : /* We don't need the TupleQueueReaders yet, though. */
805 522 : pei->reader = NULL;
806 :
807 : /*
808 : * If instrumentation options were supplied, allocate space for the data.
809 : * It only gets partially initialized here; the rest happens during
810 : * ExecParallelInitializeDSM.
811 : */
812 522 : if (estate->es_instrument)
813 : {
814 : Instrumentation *instrument;
815 : int i;
816 :
817 120 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
818 120 : instrumentation->instrument_options = estate->es_instrument;
819 120 : instrumentation->instrument_offset = instrument_offset;
820 120 : instrumentation->num_workers = nworkers;
821 120 : instrumentation->num_plan_nodes = e.nnodes;
822 120 : instrument = GetInstrumentationArray(instrumentation);
823 1240 : for (i = 0; i < nworkers * e.nnodes; ++i)
824 1120 : InstrInit(&instrument[i], estate->es_instrument);
825 120 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
826 : instrumentation);
827 120 : pei->instrumentation = instrumentation;
828 :
829 120 : if (estate->es_jit_flags != PGJIT_NONE)
830 : {
831 0 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
832 : jit_instrumentation_len);
833 0 : jit_instrumentation->num_workers = nworkers;
834 0 : memset(jit_instrumentation->jit_instr, 0,
835 : sizeof(JitInstrumentation) * nworkers);
836 0 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
837 : jit_instrumentation);
838 0 : pei->jit_instrumentation = jit_instrumentation;
839 : }
840 : }
841 :
842 : /*
843 : * Create a DSA area that can be used by the leader and all workers.
844 : * (However, if we failed to create a DSM and are using private memory
845 : * instead, then skip this.)
846 : */
847 522 : if (pcxt->seg != NULL)
848 : {
849 : char *area_space;
850 :
851 522 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
852 522 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
853 522 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
854 : LWTRANCHE_PARALLEL_QUERY_DSA,
855 : pcxt->seg);
856 :
857 : /*
858 : * Serialize parameters, if any, using DSA storage. We don't dare use
859 : * the main parallel query DSM for this because we might relaunch
860 : * workers after the values have changed (and thus the amount of
861 : * storage required has changed).
862 : */
863 522 : if (!bms_is_empty(sendParams))
864 : {
865 16 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
866 : pei->area);
867 16 : fpes->param_exec = pei->param_exec;
868 : }
869 : }
870 :
871 : /*
872 : * Give parallel-aware nodes a chance to initialize their shared data.
873 : * This also initializes the elements of instrumentation->ps_instrument,
874 : * if it exists.
875 : */
876 522 : d.pcxt = pcxt;
877 522 : d.instrumentation = instrumentation;
878 522 : d.nnodes = 0;
879 :
880 : /* Install our DSA area while initializing the plan. */
881 522 : estate->es_query_dsa = pei->area;
882 522 : ExecParallelInitializeDSM(planstate, &d);
883 522 : estate->es_query_dsa = NULL;
884 :
885 : /*
886 : * Make sure that the world hasn't shifted under our feet. This could
887 : * probably just be an Assert(), but let's be conservative for now.
888 : */
889 522 : if (e.nnodes != d.nnodes)
890 0 : elog(ERROR, "inconsistent count of PlanState nodes");
891 :
892 : /* OK, we're ready to rock and roll. */
893 522 : return pei;
894 : }
895 :
896 : /*
897 : * Set up tuple queue readers to read the results of a parallel subplan.
898 : *
899 : * This is separate from ExecInitParallelPlan() because we can launch the
900 : * worker processes and let them start doing something before we do this.
901 : */
902 : void
903 682 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
904 : {
905 682 : int nworkers = pei->pcxt->nworkers_launched;
906 : int i;
907 :
908 : Assert(pei->reader == NULL);
909 :
910 682 : if (nworkers > 0)
911 : {
912 682 : pei->reader = (TupleQueueReader **)
913 682 : palloc(nworkers * sizeof(TupleQueueReader *));
914 :
915 2492 : for (i = 0; i < nworkers; i++)
916 : {
917 1810 : shm_mq_set_handle(pei->tqueue[i],
918 1810 : pei->pcxt->worker[i].bgwhandle);
919 1810 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
920 : }
921 : }
922 682 : }
923 :
924 : /*
925 : * Re-initialize the parallel executor shared memory state before launching
926 : * a fresh batch of workers.
927 : */
928 : void
929 172 : ExecParallelReinitialize(PlanState *planstate,
930 : ParallelExecutorInfo *pei,
931 : Bitmapset *sendParams)
932 : {
933 172 : EState *estate = planstate->state;
934 : FixedParallelExecutorState *fpes;
935 :
936 : /* Old workers must already be shut down */
937 : Assert(pei->finished);
938 :
939 : /*
940 : * Force any initplan outputs that we're going to pass to workers to be
941 : * evaluated, if they weren't already (see comments in
942 : * ExecInitParallelPlan).
943 : */
944 172 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
945 :
946 172 : ReinitializeParallelDSM(pei->pcxt);
947 172 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
948 172 : pei->reader = NULL;
949 172 : pei->finished = false;
950 :
951 172 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
952 :
953 : /* Free any serialized parameters from the last round. */
954 172 : if (DsaPointerIsValid(fpes->param_exec))
955 : {
956 0 : dsa_free(pei->area, fpes->param_exec);
957 0 : fpes->param_exec = InvalidDsaPointer;
958 : }
959 :
960 : /* Serialize current parameter values if required. */
961 172 : if (!bms_is_empty(sendParams))
962 : {
963 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
964 : pei->area);
965 0 : fpes->param_exec = pei->param_exec;
966 : }
967 :
968 : /* Traverse plan tree and let each child node reset associated state. */
969 172 : estate->es_query_dsa = pei->area;
970 172 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
971 172 : estate->es_query_dsa = NULL;
972 172 : }
973 :
974 : /*
975 : * Traverse plan tree to reinitialize per-node dynamic shared memory state
976 : */
977 : static bool
978 444 : ExecParallelReInitializeDSM(PlanState *planstate,
979 : ParallelContext *pcxt)
980 : {
981 444 : if (planstate == NULL)
982 0 : return false;
983 :
984 : /*
985 : * Call reinitializers for DSM-using plan nodes.
986 : */
987 444 : switch (nodeTag(planstate))
988 : {
989 184 : case T_SeqScanState:
990 184 : if (planstate->plan->parallel_aware)
991 152 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
992 : pcxt);
993 184 : break;
994 8 : case T_IndexScanState:
995 8 : if (planstate->plan->parallel_aware)
996 8 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
997 : pcxt);
998 8 : break;
999 8 : case T_IndexOnlyScanState:
1000 8 : if (planstate->plan->parallel_aware)
1001 8 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
1002 : pcxt);
1003 8 : break;
1004 0 : case T_ForeignScanState:
1005 0 : if (planstate->plan->parallel_aware)
1006 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
1007 : pcxt);
1008 0 : break;
1009 0 : case T_TidRangeScanState:
1010 0 : if (planstate->plan->parallel_aware)
1011 0 : ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1012 : pcxt);
1013 0 : break;
1014 0 : case T_AppendState:
1015 0 : if (planstate->plan->parallel_aware)
1016 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1017 0 : break;
1018 0 : case T_CustomScanState:
1019 0 : if (planstate->plan->parallel_aware)
1020 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1021 : pcxt);
1022 0 : break;
1023 36 : case T_BitmapHeapScanState:
1024 36 : if (planstate->plan->parallel_aware)
1025 36 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1026 : pcxt);
1027 36 : break;
1028 64 : case T_HashJoinState:
1029 64 : if (planstate->plan->parallel_aware)
1030 32 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1031 : pcxt);
1032 64 : break;
1033 120 : case T_BitmapIndexScanState:
1034 : case T_HashState:
1035 : case T_SortState:
1036 : case T_IncrementalSortState:
1037 : case T_MemoizeState:
1038 : /* these nodes have DSM state, but no reinitialization is required */
1039 120 : break;
1040 :
1041 24 : default:
1042 24 : break;
1043 : }
1044 :
1045 444 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1046 : }
1047 :
1048 : /*
1049 : * Copy instrumentation information about this node and its descendants from
1050 : * dynamic shared memory.
1051 : */
1052 : static bool
1053 684 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1054 : SharedExecutorInstrumentation *instrumentation)
1055 : {
1056 : Instrumentation *instrument;
1057 : int i;
1058 : int n;
1059 : int ibytes;
1060 684 : int plan_node_id = planstate->plan->plan_node_id;
1061 : MemoryContext oldcontext;
1062 :
1063 : /* Find the instrumentation for this node. */
1064 3092 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1065 3092 : if (instrumentation->plan_node_id[i] == plan_node_id)
1066 684 : break;
1067 684 : if (i >= instrumentation->num_plan_nodes)
1068 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1069 :
1070 : /* Accumulate the statistics from all workers. */
1071 684 : instrument = GetInstrumentationArray(instrumentation);
1072 684 : instrument += i * instrumentation->num_workers;
1073 1804 : for (n = 0; n < instrumentation->num_workers; ++n)
1074 1120 : InstrAggNode(planstate->instrument, &instrument[n]);
1075 :
1076 : /*
1077 : * Also store the per-worker detail.
1078 : *
1079 : * Worker instrumentation should be allocated in the same context as the
1080 : * regular instrumentation information, which is the per-query context.
1081 : * Switch into per-query memory context.
1082 : */
1083 684 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1084 684 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1085 684 : planstate->worker_instrument =
1086 684 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1087 684 : MemoryContextSwitchTo(oldcontext);
1088 :
1089 684 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
1090 684 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1091 :
1092 : /* Perform any node-type-specific work that needs to be done. */
1093 684 : switch (nodeTag(planstate))
1094 : {
1095 180 : case T_IndexScanState:
1096 180 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1097 180 : break;
1098 0 : case T_IndexOnlyScanState:
1099 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1100 0 : break;
1101 0 : case T_BitmapIndexScanState:
1102 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1103 0 : break;
1104 8 : case T_SortState:
1105 8 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1106 8 : break;
1107 0 : case T_IncrementalSortState:
1108 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1109 0 : break;
1110 56 : case T_HashState:
1111 56 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1112 56 : break;
1113 68 : case T_AggState:
1114 68 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1115 68 : break;
1116 0 : case T_MemoizeState:
1117 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1118 0 : break;
1119 0 : case T_BitmapHeapScanState:
1120 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1121 0 : break;
1122 372 : default:
1123 372 : break;
1124 : }
1125 :
1126 684 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1127 : instrumentation);
1128 : }
1129 :
1130 : /*
1131 : * Add up the workers' JIT instrumentation from dynamic shared memory.
1132 : */
1133 : static void
1134 0 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1135 : SharedJitInstrumentation *shared_jit)
1136 : {
1137 : JitInstrumentation *combined;
1138 : int ibytes;
1139 :
1140 : int n;
1141 :
1142 : /*
1143 : * Accumulate worker JIT instrumentation into the combined JIT
1144 : * instrumentation, allocating it if required.
1145 : */
1146 0 : if (!planstate->state->es_jit_worker_instr)
1147 0 : planstate->state->es_jit_worker_instr =
1148 0 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1149 0 : combined = planstate->state->es_jit_worker_instr;
1150 :
1151 : /* Accumulate all the workers' instrumentations. */
1152 0 : for (n = 0; n < shared_jit->num_workers; ++n)
1153 0 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1154 :
1155 : /*
1156 : * Store the per-worker detail.
1157 : *
1158 : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1159 : * instrumentation in per-query context.
1160 : */
1161 0 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1162 0 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1163 0 : planstate->worker_jit_instrument =
1164 0 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1165 :
1166 0 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1167 0 : }
1168 :
1169 : /*
1170 : * Finish parallel execution. We wait for parallel workers to finish, and
1171 : * accumulate their buffer/WAL usage.
1172 : */
1173 : void
1174 1240 : ExecParallelFinish(ParallelExecutorInfo *pei)
1175 : {
1176 1240 : int nworkers = pei->pcxt->nworkers_launched;
1177 : int i;
1178 :
1179 : /* Make this be a no-op if called twice in a row. */
1180 1240 : if (pei->finished)
1181 554 : return;
1182 :
1183 : /*
1184 : * Detach from tuple queues ASAP, so that any still-active workers will
1185 : * notice that no further results are wanted.
1186 : */
1187 686 : if (pei->tqueue != NULL)
1188 : {
1189 2488 : for (i = 0; i < nworkers; i++)
1190 1802 : shm_mq_detach(pei->tqueue[i]);
1191 686 : pfree(pei->tqueue);
1192 686 : pei->tqueue = NULL;
1193 : }
1194 :
1195 : /*
1196 : * While we're waiting for the workers to finish, let's get rid of the
1197 : * tuple queue readers. (Any other local cleanup could be done here too.)
1198 : */
1199 686 : if (pei->reader != NULL)
1200 : {
1201 2476 : for (i = 0; i < nworkers; i++)
1202 1802 : DestroyTupleQueueReader(pei->reader[i]);
1203 674 : pfree(pei->reader);
1204 674 : pei->reader = NULL;
1205 : }
1206 :
1207 : /* Now wait for the workers to finish. */
1208 686 : WaitForParallelWorkersToFinish(pei->pcxt);
1209 :
1210 : /*
1211 : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1212 : * finish, or we might get incomplete data.)
1213 : */
1214 2488 : for (i = 0; i < nworkers; i++)
1215 1802 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1216 :
1217 686 : pei->finished = true;
1218 : }
1219 :
1220 : /*
1221 : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1222 : * resources still exist after ExecParallelFinish. We separate these
1223 : * routines because someone might want to examine the contents of the DSM
1224 : * after ExecParallelFinish and before calling this routine.
1225 : */
1226 : void
1227 514 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1228 : {
1229 : /* Accumulate instrumentation, if any. */
1230 514 : if (pei->instrumentation)
1231 120 : ExecParallelRetrieveInstrumentation(pei->planstate,
1232 : pei->instrumentation);
1233 :
1234 : /* Accumulate JIT instrumentation, if any. */
1235 514 : if (pei->jit_instrumentation)
1236 0 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
1237 0 : pei->jit_instrumentation);
1238 :
1239 : /* Free any serialized parameters. */
1240 514 : if (DsaPointerIsValid(pei->param_exec))
1241 : {
1242 16 : dsa_free(pei->area, pei->param_exec);
1243 16 : pei->param_exec = InvalidDsaPointer;
1244 : }
1245 514 : if (pei->area != NULL)
1246 : {
1247 514 : dsa_detach(pei->area);
1248 514 : pei->area = NULL;
1249 : }
1250 514 : if (pei->pcxt != NULL)
1251 : {
1252 514 : DestroyParallelContext(pei->pcxt);
1253 514 : pei->pcxt = NULL;
1254 : }
1255 514 : pfree(pei);
1256 514 : }
1257 :
1258 : /*
1259 : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1260 : * for that purpose.
1261 : */
1262 : static DestReceiver *
1263 1810 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1264 : {
1265 : char *mqspace;
1266 : shm_mq *mq;
1267 :
1268 1810 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1269 1810 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1270 1810 : mq = (shm_mq *) mqspace;
1271 1810 : shm_mq_set_sender(mq, MyProc);
1272 1810 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1273 : }
1274 :
1275 : /*
1276 : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1277 : */
1278 : static QueryDesc *
1279 1810 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1280 : int instrument_options)
1281 : {
1282 : char *pstmtspace;
1283 : char *paramspace;
1284 : PlannedStmt *pstmt;
1285 : ParamListInfo paramLI;
1286 : char *queryString;
1287 :
1288 : /* Get the query string from shared memory */
1289 1810 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1290 :
1291 : /* Reconstruct leader-supplied PlannedStmt. */
1292 1810 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1293 1810 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1294 :
1295 : /* Reconstruct ParamListInfo. */
1296 1810 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1297 1810 : paramLI = RestoreParamList(¶mspace);
1298 :
1299 : /* Create a QueryDesc for the query. */
1300 1810 : return CreateQueryDesc(pstmt,
1301 : queryString,
1302 : GetActiveSnapshot(), InvalidSnapshot,
1303 : receiver, paramLI, NULL, instrument_options);
1304 : }
1305 :
1306 : /*
1307 : * Copy instrumentation information from this node and its descendants into
1308 : * dynamic shared memory, so that the parallel leader can retrieve it.
1309 : */
1310 : static bool
1311 1579 : ExecParallelReportInstrumentation(PlanState *planstate,
1312 : SharedExecutorInstrumentation *instrumentation)
1313 : {
1314 : int i;
1315 1579 : int plan_node_id = planstate->plan->plan_node_id;
1316 : Instrumentation *instrument;
1317 :
1318 1579 : InstrEndLoop(planstate->instrument);
1319 :
1320 : /*
1321 : * If we shuffled the plan_node_id values in ps_instrument into sorted
1322 : * order, we could use binary search here. This might matter someday if
1323 : * we're pushing down sufficiently large plan trees. For now, do it the
1324 : * slow, dumb way.
1325 : */
1326 5193 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1327 5193 : if (instrumentation->plan_node_id[i] == plan_node_id)
1328 1579 : break;
1329 1579 : if (i >= instrumentation->num_plan_nodes)
1330 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1331 :
1332 : /*
1333 : * Add our statistics to the per-node, per-worker totals. It's possible
1334 : * that this could happen more than once if we relaunched workers.
1335 : */
1336 1579 : instrument = GetInstrumentationArray(instrumentation);
1337 1579 : instrument += i * instrumentation->num_workers;
1338 : Assert(IsParallelWorker());
1339 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1340 1579 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1341 :
1342 1579 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1343 : instrumentation);
1344 : }
1345 :
1346 : /*
1347 : * Initialize the PlanState and its descendants with the information
1348 : * retrieved from shared memory. This has to be done once the PlanState
1349 : * is allocated and initialized by executor; that is, after ExecutorStart().
1350 : */
1351 : static bool
1352 7638 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1353 : {
1354 7638 : if (planstate == NULL)
1355 0 : return false;
1356 :
1357 7638 : switch (nodeTag(planstate))
1358 : {
1359 3230 : case T_SeqScanState:
1360 3230 : if (planstate->plan->parallel_aware)
1361 2452 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1362 3230 : break;
1363 424 : case T_IndexScanState:
1364 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1365 424 : ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
1366 424 : break;
1367 168 : case T_IndexOnlyScanState:
1368 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1369 168 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1370 : pwcxt);
1371 168 : break;
1372 180 : case T_BitmapIndexScanState:
1373 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1374 180 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1375 : pwcxt);
1376 180 : break;
1377 0 : case T_ForeignScanState:
1378 0 : if (planstate->plan->parallel_aware)
1379 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1380 : pwcxt);
1381 0 : break;
1382 64 : case T_TidRangeScanState:
1383 64 : if (planstate->plan->parallel_aware)
1384 64 : ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1385 : pwcxt);
1386 64 : break;
1387 296 : case T_AppendState:
1388 296 : if (planstate->plan->parallel_aware)
1389 240 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1390 296 : break;
1391 0 : case T_CustomScanState:
1392 0 : if (planstate->plan->parallel_aware)
1393 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1394 : pwcxt);
1395 0 : break;
1396 180 : case T_BitmapHeapScanState:
1397 180 : if (planstate->plan->parallel_aware)
1398 179 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1399 : pwcxt);
1400 180 : break;
1401 525 : case T_HashJoinState:
1402 525 : if (planstate->plan->parallel_aware)
1403 213 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1404 : pwcxt);
1405 525 : break;
1406 525 : case T_HashState:
1407 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1408 525 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1409 525 : break;
1410 498 : case T_SortState:
1411 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1412 498 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1413 498 : break;
1414 0 : case T_IncrementalSortState:
1415 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1416 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1417 : pwcxt);
1418 0 : break;
1419 1108 : case T_AggState:
1420 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1421 1108 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1422 1108 : break;
1423 8 : case T_MemoizeState:
1424 : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1425 8 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1426 8 : break;
1427 432 : default:
1428 432 : break;
1429 : }
1430 :
1431 7638 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1432 : pwcxt);
1433 : }
1434 :
1435 : /*
1436 : * Main entrypoint for parallel query worker processes.
1437 : *
1438 : * We reach this function from ParallelWorkerMain, so the setup necessary to
1439 : * create a sensible parallel environment has already been done;
1440 : * ParallelWorkerMain worries about stuff like the transaction state, combo
1441 : * CID mappings, and GUC values, so we don't need to deal with any of that
1442 : * here.
1443 : *
1444 : * Our job is to deal with concerns specific to the executor. The parallel
1445 : * group leader will have stored a serialized PlannedStmt, and it's our job
1446 : * to execute that plan and write the resulting tuples to the appropriate
1447 : * tuple queue. Various bits of supporting information that we need in order
1448 : * to do this are also stored in the dsm_segment and can be accessed through
1449 : * the shm_toc.
1450 : */
1451 : void
1452 1810 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1453 : {
1454 : FixedParallelExecutorState *fpes;
1455 : BufferUsage *buffer_usage;
1456 : WalUsage *wal_usage;
1457 : DestReceiver *receiver;
1458 : QueryDesc *queryDesc;
1459 : SharedExecutorInstrumentation *instrumentation;
1460 : SharedJitInstrumentation *jit_instrumentation;
1461 1810 : int instrument_options = 0;
1462 : void *area_space;
1463 : dsa_area *area;
1464 : ParallelWorkerContext pwcxt;
1465 :
1466 : /* Get fixed-size state. */
1467 1810 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1468 :
1469 : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1470 1810 : receiver = ExecParallelGetReceiver(seg, toc);
1471 1810 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1472 1810 : if (instrumentation != NULL)
1473 483 : instrument_options = instrumentation->instrument_options;
1474 1810 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1475 : true);
1476 1810 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1477 :
1478 : /* Setting debug_query_string for individual workers */
1479 1810 : debug_query_string = queryDesc->sourceText;
1480 :
1481 : /* Report workers' query for monitoring purposes */
1482 1810 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1483 :
1484 : /* Attach to the dynamic shared memory area. */
1485 1810 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1486 1810 : area = dsa_attach_in_place(area_space, seg);
1487 :
1488 : /* Start up the executor */
1489 1810 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1490 1810 : ExecutorStart(queryDesc, fpes->eflags);
1491 :
1492 : /* Special executor initialization steps for parallel workers */
1493 1810 : queryDesc->planstate->state->es_query_dsa = area;
1494 1810 : if (DsaPointerIsValid(fpes->param_exec))
1495 : {
1496 : char *paramexec_space;
1497 :
1498 48 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1499 48 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1500 : }
1501 1810 : pwcxt.toc = toc;
1502 1810 : pwcxt.seg = seg;
1503 1810 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1504 :
1505 : /* Pass down any tuple bound */
1506 1810 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1507 :
1508 : /*
1509 : * Prepare to track buffer/WAL usage during query execution.
1510 : *
1511 : * We do this after starting up the executor to match what happens in the
1512 : * leader, which also doesn't count buffer accesses and WAL activity that
1513 : * occur during executor startup.
1514 : */
1515 1810 : InstrStartParallelQuery();
1516 :
1517 : /*
1518 : * Run the plan. If we specified a tuple bound, be careful not to demand
1519 : * more tuples than that.
1520 : */
1521 1810 : ExecutorRun(queryDesc,
1522 : ForwardScanDirection,
1523 1810 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1524 :
1525 : /* Shut down the executor */
1526 1802 : ExecutorFinish(queryDesc);
1527 :
1528 : /* Report buffer/WAL usage during parallel execution. */
1529 1802 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1530 1802 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1531 1802 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1532 1802 : &wal_usage[ParallelWorkerNumber]);
1533 :
1534 : /* Report instrumentation data if any instrumentation options are set. */
1535 1802 : if (instrumentation != NULL)
1536 483 : ExecParallelReportInstrumentation(queryDesc->planstate,
1537 : instrumentation);
1538 :
1539 : /* Report JIT instrumentation data if any */
1540 1802 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1541 : {
1542 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1543 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1544 0 : queryDesc->estate->es_jit->instr;
1545 : }
1546 :
1547 : /* Must do this after capturing instrumentation. */
1548 1802 : ExecutorEnd(queryDesc);
1549 :
1550 : /* Cleanup. */
1551 1802 : dsa_detach(area);
1552 1802 : FreeQueryDesc(queryDesc);
1553 1802 : receiver->rDestroy(receiver);
1554 1802 : }
|