LCOV - code coverage report
Current view: top level - src/backend/executor - execParallel.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.0 % 625 519
Test Date: 2026-03-05 17:14:59 Functions: 95.0 % 20 19
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * execParallel.c
       4              :  *    Support routines for parallel execution.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * This file contains routines that are intended to support setting up,
      10              :  * using, and tearing down a ParallelContext from within the PostgreSQL
      11              :  * executor.  The ParallelContext machinery will handle starting the
      12              :  * workers and ensuring that their state generally matches that of the
      13              :  * leader; see src/backend/access/transam/README.parallel for details.
      14              :  * However, we must save and restore relevant executor state, such as
      15              :  * any ParamListInfo associated with the query, buffer/WAL usage info, and
      16              :  * the actual plan to be passed down to the worker.
      17              :  *
      18              :  * IDENTIFICATION
      19              :  *    src/backend/executor/execParallel.c
      20              :  *
      21              :  *-------------------------------------------------------------------------
      22              :  */
      23              : 
      24              : #include "postgres.h"
      25              : 
      26              : #include "executor/execParallel.h"
      27              : #include "executor/executor.h"
      28              : #include "executor/nodeAgg.h"
      29              : #include "executor/nodeAppend.h"
      30              : #include "executor/nodeBitmapHeapscan.h"
      31              : #include "executor/nodeBitmapIndexscan.h"
      32              : #include "executor/nodeCustom.h"
      33              : #include "executor/nodeForeignscan.h"
      34              : #include "executor/nodeHash.h"
      35              : #include "executor/nodeHashjoin.h"
      36              : #include "executor/nodeIncrementalSort.h"
      37              : #include "executor/nodeIndexonlyscan.h"
      38              : #include "executor/nodeIndexscan.h"
      39              : #include "executor/nodeMemoize.h"
      40              : #include "executor/nodeSeqscan.h"
      41              : #include "executor/nodeSort.h"
      42              : #include "executor/nodeSubplan.h"
      43              : #include "executor/nodeTidrangescan.h"
      44              : #include "executor/tqueue.h"
      45              : #include "jit/jit.h"
      46              : #include "nodes/nodeFuncs.h"
      47              : #include "pgstat.h"
      48              : #include "storage/proc.h"
      49              : #include "tcop/tcopprot.h"
      50              : #include "utils/datum.h"
      51              : #include "utils/dsa.h"
      52              : #include "utils/lsyscache.h"
      53              : #include "utils/snapmgr.h"
      54              : 
      55              : /*
      56              :  * Magic numbers for parallel executor communication.  We use constants
      57              :  * greater than any 32-bit integer here so that values < 2^32 can be used
      58              :  * by individual parallel nodes to store their own state.
      59              :  */
      60              : #define PARALLEL_KEY_EXECUTOR_FIXED     UINT64CONST(0xE000000000000001)
      61              : #define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
      62              : #define PARALLEL_KEY_PARAMLISTINFO      UINT64CONST(0xE000000000000003)
      63              : #define PARALLEL_KEY_BUFFER_USAGE       UINT64CONST(0xE000000000000004)
      64              : #define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
      65              : #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
      66              : #define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
      67              : #define PARALLEL_KEY_QUERY_TEXT     UINT64CONST(0xE000000000000008)
      68              : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
      69              : #define PARALLEL_KEY_WAL_USAGE          UINT64CONST(0xE00000000000000A)
      70              : 
      71              : #define PARALLEL_TUPLE_QUEUE_SIZE       65536
      72              : 
      73              : /*
      74              :  * Fixed-size random stuff that we need to pass to parallel workers.
      75              :  */
      76              : typedef struct FixedParallelExecutorState
      77              : {
      78              :     int64       tuples_needed;  /* tuple bound, see ExecSetTupleBound */
      79              :     dsa_pointer param_exec;
      80              :     int         eflags;
      81              :     int         jit_flags;
      82              : } FixedParallelExecutorState;
      83              : 
      84              : /*
      85              :  * DSM structure for accumulating per-PlanState instrumentation.
      86              :  *
      87              :  * instrument_options: Same meaning here as in instrument.c.
      88              :  *
      89              :  * instrument_offset: Offset, relative to the start of this structure,
      90              :  * of the first Instrumentation object.  This will depend on the length of
      91              :  * the plan_node_id array.
      92              :  *
      93              :  * num_workers: Number of workers.
      94              :  *
      95              :  * num_plan_nodes: Number of plan nodes.
      96              :  *
      97              :  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
      98              :  * from parallel workers.  The length of this array is given by num_plan_nodes.
      99              :  */
     100              : struct SharedExecutorInstrumentation
     101              : {
     102              :     int         instrument_options;
     103              :     int         instrument_offset;
     104              :     int         num_workers;
     105              :     int         num_plan_nodes;
     106              :     int         plan_node_id[FLEXIBLE_ARRAY_MEMBER];
     107              :     /* array of num_plan_nodes * num_workers Instrumentation objects follows */
     108              : };
     109              : #define GetInstrumentationArray(sei) \
     110              :     (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
     111              :      (Instrumentation *) (((char *) sei) + sei->instrument_offset))
     112              : 
     113              : /* Context object for ExecParallelEstimate. */
     114              : typedef struct ExecParallelEstimateContext
     115              : {
     116              :     ParallelContext *pcxt;
     117              :     int         nnodes;
     118              : } ExecParallelEstimateContext;
     119              : 
     120              : /* Context object for ExecParallelInitializeDSM. */
     121              : typedef struct ExecParallelInitializeDSMContext
     122              : {
     123              :     ParallelContext *pcxt;
     124              :     SharedExecutorInstrumentation *instrumentation;
     125              :     int         nnodes;
     126              : } ExecParallelInitializeDSMContext;
     127              : 
     128              : /* Helper functions that run in the parallel leader. */
     129              : static char *ExecSerializePlan(Plan *plan, EState *estate);
     130              : static bool ExecParallelEstimate(PlanState *planstate,
     131              :                                  ExecParallelEstimateContext *e);
     132              : static bool ExecParallelInitializeDSM(PlanState *planstate,
     133              :                                       ExecParallelInitializeDSMContext *d);
     134              : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
     135              :                                                     bool reinitialize);
     136              : static bool ExecParallelReInitializeDSM(PlanState *planstate,
     137              :                                         ParallelContext *pcxt);
     138              : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     139              :                                                 SharedExecutorInstrumentation *instrumentation);
     140              : 
     141              : /* Helper function that runs in the parallel worker. */
     142              : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
     143              : 
     144              : /*
     145              :  * Create a serialized representation of the plan to be sent to each worker.
     146              :  */
     147              : static char *
     148          383 : ExecSerializePlan(Plan *plan, EState *estate)
     149              : {
     150              :     PlannedStmt *pstmt;
     151              :     ListCell   *lc;
     152              : 
     153              :     /* We can't scribble on the original plan, so make a copy. */
     154          383 :     plan = copyObject(plan);
     155              : 
     156              :     /*
     157              :      * The worker will start its own copy of the executor, and that copy will
     158              :      * insert a junk filter if the toplevel node has any resjunk entries. We
     159              :      * don't want that to happen, because while resjunk columns shouldn't be
     160              :      * sent back to the user, here the tuples are coming back to another
     161              :      * backend which may very well need them.  So mutate the target list
     162              :      * accordingly.  This is sort of a hack; there might be better ways to do
     163              :      * this...
     164              :      */
     165         1046 :     foreach(lc, plan->targetlist)
     166              :     {
     167          663 :         TargetEntry *tle = lfirst_node(TargetEntry, lc);
     168              : 
     169          663 :         tle->resjunk = false;
     170              :     }
     171              : 
     172              :     /*
     173              :      * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
     174              :      * for our purposes, but the worker will need at least a minimal
     175              :      * PlannedStmt to start the executor.
     176              :      */
     177          383 :     pstmt = makeNode(PlannedStmt);
     178          383 :     pstmt->commandType = CMD_SELECT;
     179          383 :     pstmt->queryId = pgstat_get_my_query_id();
     180          383 :     pstmt->planId = pgstat_get_my_plan_id();
     181          383 :     pstmt->hasReturning = false;
     182          383 :     pstmt->hasModifyingCTE = false;
     183          383 :     pstmt->canSetTag = true;
     184          383 :     pstmt->transientPlan = false;
     185          383 :     pstmt->dependsOnRole = false;
     186          383 :     pstmt->parallelModeNeeded = false;
     187          383 :     pstmt->planTree = plan;
     188          383 :     pstmt->partPruneInfos = estate->es_part_prune_infos;
     189          383 :     pstmt->rtable = estate->es_range_table;
     190          383 :     pstmt->unprunableRelids = estate->es_unpruned_relids;
     191          383 :     pstmt->permInfos = estate->es_rteperminfos;
     192          383 :     pstmt->resultRelations = NIL;
     193          383 :     pstmt->appendRelations = NIL;
     194          383 :     pstmt->planOrigin = PLAN_STMT_INTERNAL;
     195              : 
     196              :     /*
     197              :      * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
     198              :      * for unsafe ones (so that the list indexes of the safe ones are
     199              :      * preserved).  This positively ensures that the worker won't try to run,
     200              :      * or even do ExecInitNode on, an unsafe subplan.  That's important to
     201              :      * protect, eg, non-parallel-aware FDWs from getting into trouble.
     202              :      */
     203          383 :     pstmt->subplans = NIL;
     204          410 :     foreach(lc, estate->es_plannedstmt->subplans)
     205              :     {
     206           27 :         Plan       *subplan = (Plan *) lfirst(lc);
     207              : 
     208           27 :         if (subplan && !subplan->parallel_safe)
     209            6 :             subplan = NULL;
     210           27 :         pstmt->subplans = lappend(pstmt->subplans, subplan);
     211              :     }
     212              : 
     213          383 :     pstmt->rewindPlanIDs = NULL;
     214          383 :     pstmt->rowMarks = NIL;
     215          383 :     pstmt->relationOids = NIL;
     216          383 :     pstmt->invalItems = NIL; /* workers can't replan anyway... */
     217          383 :     pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
     218          383 :     pstmt->utilityStmt = NULL;
     219          383 :     pstmt->stmt_location = -1;
     220          383 :     pstmt->stmt_len = -1;
     221              : 
     222              :     /* Return serialized copy of our dummy PlannedStmt. */
     223          383 :     return nodeToString(pstmt);
     224              : }
     225              : 
     226              : /*
     227              :  * Parallel-aware plan nodes (and occasionally others) may need some state
     228              :  * which is shared across all parallel workers.  Before we size the DSM, give
     229              :  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
     230              :  * &pcxt->estimator.
     231              :  *
     232              :  * While we're at it, count the number of PlanState nodes in the tree, so
     233              :  * we know how many Instrumentation structures we need.
     234              :  */
     235              : static bool
     236         1549 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
     237              : {
     238         1549 :     if (planstate == NULL)
     239            0 :         return false;
     240              : 
     241              :     /* Count this node. */
     242         1549 :     e->nnodes++;
     243              : 
     244         1549 :     switch (nodeTag(planstate))
     245              :     {
     246          587 :         case T_SeqScanState:
     247          587 :             if (planstate->plan->parallel_aware)
     248          468 :                 ExecSeqScanEstimate((SeqScanState *) planstate,
     249              :                                     e->pcxt);
     250          587 :             break;
     251          147 :         case T_IndexScanState:
     252              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     253          147 :             ExecIndexScanEstimate((IndexScanState *) planstate,
     254              :                                   e->pcxt);
     255          147 :             break;
     256           29 :         case T_IndexOnlyScanState:
     257              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     258           29 :             ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
     259              :                                       e->pcxt);
     260           29 :             break;
     261           10 :         case T_BitmapIndexScanState:
     262              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     263           10 :             ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
     264              :                                         e->pcxt);
     265           10 :             break;
     266            0 :         case T_ForeignScanState:
     267            0 :             if (planstate->plan->parallel_aware)
     268            0 :                 ExecForeignScanEstimate((ForeignScanState *) planstate,
     269              :                                         e->pcxt);
     270            0 :             break;
     271           12 :         case T_TidRangeScanState:
     272           12 :             if (planstate->plan->parallel_aware)
     273           12 :                 ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
     274              :                                          e->pcxt);
     275           12 :             break;
     276           93 :         case T_AppendState:
     277           93 :             if (planstate->plan->parallel_aware)
     278           69 :                 ExecAppendEstimate((AppendState *) planstate,
     279              :                                    e->pcxt);
     280           93 :             break;
     281            0 :         case T_CustomScanState:
     282            0 :             if (planstate->plan->parallel_aware)
     283            0 :                 ExecCustomScanEstimate((CustomScanState *) planstate,
     284              :                                        e->pcxt);
     285            0 :             break;
     286           10 :         case T_BitmapHeapScanState:
     287           10 :             if (planstate->plan->parallel_aware)
     288            9 :                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
     289              :                                        e->pcxt);
     290           10 :             break;
     291           99 :         case T_HashJoinState:
     292           99 :             if (planstate->plan->parallel_aware)
     293           63 :                 ExecHashJoinEstimate((HashJoinState *) planstate,
     294              :                                      e->pcxt);
     295           99 :             break;
     296           99 :         case T_HashState:
     297              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     298           99 :             ExecHashEstimate((HashState *) planstate, e->pcxt);
     299           99 :             break;
     300           82 :         case T_SortState:
     301              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     302           82 :             ExecSortEstimate((SortState *) planstate, e->pcxt);
     303           82 :             break;
     304            0 :         case T_IncrementalSortState:
     305              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     306            0 :             ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
     307            0 :             break;
     308          295 :         case T_AggState:
     309              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     310          295 :             ExecAggEstimate((AggState *) planstate, e->pcxt);
     311          295 :             break;
     312            3 :         case T_MemoizeState:
     313              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     314            3 :             ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
     315            3 :             break;
     316           83 :         default:
     317           83 :             break;
     318              :     }
     319              : 
     320         1549 :     return planstate_tree_walker(planstate, ExecParallelEstimate, e);
     321              : }
     322              : 
     323              : /*
     324              :  * Estimate the amount of space required to serialize the indicated parameters.
     325              :  */
     326              : static Size
     327           12 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
     328              : {
     329              :     int         paramid;
     330           12 :     Size        sz = sizeof(int);
     331              : 
     332           12 :     paramid = -1;
     333           27 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     334              :     {
     335              :         Oid         typeOid;
     336              :         int16       typLen;
     337              :         bool        typByVal;
     338              :         ParamExecData *prm;
     339              : 
     340           15 :         prm = &(estate->es_param_exec_vals[paramid]);
     341           15 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     342              :                                paramid);
     343              : 
     344           15 :         sz = add_size(sz, sizeof(int)); /* space for paramid */
     345              : 
     346              :         /* space for datum/isnull */
     347           15 :         if (OidIsValid(typeOid))
     348           15 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     349              :         else
     350              :         {
     351              :             /* If no type OID, assume by-value, like copyParamList does. */
     352            0 :             typLen = sizeof(Datum);
     353            0 :             typByVal = true;
     354              :         }
     355           15 :         sz = add_size(sz,
     356           15 :                       datumEstimateSpace(prm->value, prm->isnull,
     357              :                                          typByVal, typLen));
     358              :     }
     359           12 :     return sz;
     360              : }
     361              : 
     362              : /*
     363              :  * Serialize specified PARAM_EXEC parameters.
     364              :  *
     365              :  * We write the number of parameters first, as a 4-byte integer, and then
     366              :  * write details for each parameter in turn.  The details for each parameter
     367              :  * consist of a 4-byte paramid (location of param in execution time internal
     368              :  * parameter array) and then the datum as serialized by datumSerialize().
     369              :  */
     370              : static dsa_pointer
     371           12 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
     372              : {
     373              :     Size        size;
     374              :     int         nparams;
     375              :     int         paramid;
     376              :     ParamExecData *prm;
     377              :     dsa_pointer handle;
     378              :     char       *start_address;
     379              : 
     380              :     /* Allocate enough space for the current parameter values. */
     381           12 :     size = EstimateParamExecSpace(estate, params);
     382           12 :     handle = dsa_allocate(area, size);
     383           12 :     start_address = dsa_get_address(area, handle);
     384              : 
     385              :     /* First write the number of parameters as a 4-byte integer. */
     386           12 :     nparams = bms_num_members(params);
     387           12 :     memcpy(start_address, &nparams, sizeof(int));
     388           12 :     start_address += sizeof(int);
     389              : 
     390              :     /* Write details for each parameter in turn. */
     391           12 :     paramid = -1;
     392           27 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     393              :     {
     394              :         Oid         typeOid;
     395              :         int16       typLen;
     396              :         bool        typByVal;
     397              : 
     398           15 :         prm = &(estate->es_param_exec_vals[paramid]);
     399           15 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     400              :                                paramid);
     401              : 
     402              :         /* Write paramid. */
     403           15 :         memcpy(start_address, &paramid, sizeof(int));
     404           15 :         start_address += sizeof(int);
     405              : 
     406              :         /* Write datum/isnull */
     407           15 :         if (OidIsValid(typeOid))
     408           15 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     409              :         else
     410              :         {
     411              :             /* If no type OID, assume by-value, like copyParamList does. */
     412            0 :             typLen = sizeof(Datum);
     413            0 :             typByVal = true;
     414              :         }
     415           15 :         datumSerialize(prm->value, prm->isnull, typByVal, typLen,
     416              :                        &start_address);
     417              :     }
     418              : 
     419           12 :     return handle;
     420              : }
     421              : 
     422              : /*
     423              :  * Restore specified PARAM_EXEC parameters.
     424              :  */
     425              : static void
     426           36 : RestoreParamExecParams(char *start_address, EState *estate)
     427              : {
     428              :     int         nparams;
     429              :     int         i;
     430              :     int         paramid;
     431              : 
     432           36 :     memcpy(&nparams, start_address, sizeof(int));
     433           36 :     start_address += sizeof(int);
     434              : 
     435           78 :     for (i = 0; i < nparams; i++)
     436              :     {
     437              :         ParamExecData *prm;
     438              : 
     439              :         /* Read paramid */
     440           42 :         memcpy(&paramid, start_address, sizeof(int));
     441           42 :         start_address += sizeof(int);
     442           42 :         prm = &(estate->es_param_exec_vals[paramid]);
     443              : 
     444              :         /* Read datum/isnull. */
     445           42 :         prm->value = datumRestore(&start_address, &prm->isnull);
     446           42 :         prm->execPlan = NULL;
     447              :     }
     448           36 : }
     449              : 
     450              : /*
     451              :  * Initialize the dynamic shared memory segment that will be used to control
     452              :  * parallel execution.
     453              :  */
     454              : static bool
     455         1549 : ExecParallelInitializeDSM(PlanState *planstate,
     456              :                           ExecParallelInitializeDSMContext *d)
     457              : {
     458         1549 :     if (planstate == NULL)
     459            0 :         return false;
     460              : 
     461              :     /* If instrumentation is enabled, initialize slot for this node. */
     462         1549 :     if (d->instrumentation != NULL)
     463          513 :         d->instrumentation->plan_node_id[d->nnodes] =
     464          513 :             planstate->plan->plan_node_id;
     465              : 
     466              :     /* Count this node. */
     467         1549 :     d->nnodes++;
     468              : 
     469              :     /*
     470              :      * Call initializers for DSM-using plan nodes.
     471              :      *
     472              :      * Most plan nodes won't do anything here, but plan nodes that allocated
     473              :      * DSM may need to initialize shared state in the DSM before parallel
     474              :      * workers are launched.  They can allocate the space they previously
     475              :      * estimated using shm_toc_allocate, and add the keys they previously
     476              :      * estimated using shm_toc_insert, in each case targeting pcxt->toc.
     477              :      */
     478         1549 :     switch (nodeTag(planstate))
     479              :     {
     480          587 :         case T_SeqScanState:
     481          587 :             if (planstate->plan->parallel_aware)
     482          468 :                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
     483              :                                          d->pcxt);
     484          587 :             break;
     485          147 :         case T_IndexScanState:
     486              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     487          147 :             ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
     488          147 :             break;
     489           29 :         case T_IndexOnlyScanState:
     490              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     491           29 :             ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
     492              :                                            d->pcxt);
     493           29 :             break;
     494           10 :         case T_BitmapIndexScanState:
     495              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     496           10 :             ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
     497           10 :             break;
     498            0 :         case T_ForeignScanState:
     499            0 :             if (planstate->plan->parallel_aware)
     500            0 :                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
     501              :                                              d->pcxt);
     502            0 :             break;
     503           12 :         case T_TidRangeScanState:
     504           12 :             if (planstate->plan->parallel_aware)
     505           12 :                 ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
     506              :                                               d->pcxt);
     507           12 :             break;
     508           93 :         case T_AppendState:
     509           93 :             if (planstate->plan->parallel_aware)
     510           69 :                 ExecAppendInitializeDSM((AppendState *) planstate,
     511              :                                         d->pcxt);
     512           93 :             break;
     513            0 :         case T_CustomScanState:
     514            0 :             if (planstate->plan->parallel_aware)
     515            0 :                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
     516              :                                             d->pcxt);
     517            0 :             break;
     518           10 :         case T_BitmapHeapScanState:
     519           10 :             if (planstate->plan->parallel_aware)
     520            9 :                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
     521              :                                             d->pcxt);
     522           10 :             break;
     523           99 :         case T_HashJoinState:
     524           99 :             if (planstate->plan->parallel_aware)
     525           63 :                 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
     526              :                                           d->pcxt);
     527           99 :             break;
     528           99 :         case T_HashState:
     529              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     530           99 :             ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
     531           99 :             break;
     532           82 :         case T_SortState:
     533              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     534           82 :             ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
     535           82 :             break;
     536            0 :         case T_IncrementalSortState:
     537              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     538            0 :             ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
     539            0 :             break;
     540          295 :         case T_AggState:
     541              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     542          295 :             ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
     543          295 :             break;
     544            3 :         case T_MemoizeState:
     545              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     546            3 :             ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
     547            3 :             break;
     548           83 :         default:
     549           83 :             break;
     550              :     }
     551              : 
     552         1549 :     return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
     553              : }
     554              : 
     555              : /*
     556              :  * It sets up the response queues for backend workers to return tuples
     557              :  * to the main backend and start the workers.
     558              :  */
     559              : static shm_mq_handle **
     560          512 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
     561              : {
     562              :     shm_mq_handle **responseq;
     563              :     char       *tqueuespace;
     564              :     int         i;
     565              : 
     566              :     /* Skip this if no workers. */
     567          512 :     if (pcxt->nworkers == 0)
     568            0 :         return NULL;
     569              : 
     570              :     /* Allocate memory for shared memory queue handles. */
     571              :     responseq = (shm_mq_handle **)
     572          512 :         palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
     573              : 
     574              :     /*
     575              :      * If not reinitializing, allocate space from the DSM for the queues;
     576              :      * otherwise, find the already allocated space.
     577              :      */
     578          512 :     if (!reinitialize)
     579              :         tqueuespace =
     580          383 :             shm_toc_allocate(pcxt->toc,
     581              :                              mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
     582          383 :                                       pcxt->nworkers));
     583              :     else
     584          129 :         tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     585              : 
     586              :     /* Create the queues, and become the receiver for each. */
     587         1895 :     for (i = 0; i < pcxt->nworkers; ++i)
     588              :     {
     589              :         shm_mq     *mq;
     590              : 
     591         1383 :         mq = shm_mq_create(tqueuespace +
     592         1383 :                            ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
     593              :                            (Size) PARALLEL_TUPLE_QUEUE_SIZE);
     594              : 
     595         1383 :         shm_mq_set_receiver(mq, MyProc);
     596         1383 :         responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
     597              :     }
     598              : 
     599              :     /* Add array of queues to shm_toc, so others can find it. */
     600          512 :     if (!reinitialize)
     601          383 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
     602              : 
     603              :     /* Return array of handles. */
     604          512 :     return responseq;
     605              : }
     606              : 
     607              : /*
     608              :  * Sets up the required infrastructure for backend workers to perform
     609              :  * execution and return results to the main backend.
     610              :  */
     611              : ParallelExecutorInfo *
     612          383 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
     613              :                      Bitmapset *sendParams, int nworkers,
     614              :                      int64 tuples_needed)
     615              : {
     616              :     ParallelExecutorInfo *pei;
     617              :     ParallelContext *pcxt;
     618              :     ExecParallelEstimateContext e;
     619              :     ExecParallelInitializeDSMContext d;
     620              :     FixedParallelExecutorState *fpes;
     621              :     char       *pstmt_data;
     622              :     char       *pstmt_space;
     623              :     char       *paramlistinfo_space;
     624              :     BufferUsage *bufusage_space;
     625              :     WalUsage   *walusage_space;
     626          383 :     SharedExecutorInstrumentation *instrumentation = NULL;
     627          383 :     SharedJitInstrumentation *jit_instrumentation = NULL;
     628              :     int         pstmt_len;
     629              :     int         paramlistinfo_len;
     630          383 :     int         instrumentation_len = 0;
     631          383 :     int         jit_instrumentation_len = 0;
     632          383 :     int         instrument_offset = 0;
     633          383 :     Size        dsa_minsize = dsa_minimum_size();
     634              :     char       *query_string;
     635              :     int         query_len;
     636              : 
     637              :     /*
     638              :      * Force any initplan outputs that we're going to pass to workers to be
     639              :      * evaluated, if they weren't already.
     640              :      *
     641              :      * For simplicity, we use the EState's per-output-tuple ExprContext here.
     642              :      * That risks intra-query memory leakage, since we might pass through here
     643              :      * many times before that ExprContext gets reset; but ExecSetParamPlan
     644              :      * doesn't normally leak any memory in the context (see its comments), so
     645              :      * it doesn't seem worth complicating this function's API to pass it a
     646              :      * shorter-lived ExprContext.  This might need to change someday.
     647              :      */
     648          383 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     649              : 
     650              :     /* Allocate object for return value. */
     651          383 :     pei = palloc0_object(ParallelExecutorInfo);
     652          383 :     pei->finished = false;
     653          383 :     pei->planstate = planstate;
     654              : 
     655              :     /* Fix up and serialize plan to be sent to workers. */
     656          383 :     pstmt_data = ExecSerializePlan(planstate->plan, estate);
     657              : 
     658              :     /* Create a parallel context. */
     659          383 :     pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
     660          383 :     pei->pcxt = pcxt;
     661              : 
     662              :     /*
     663              :      * Before telling the parallel context to create a dynamic shared memory
     664              :      * segment, we need to figure out how big it should be.  Estimate space
     665              :      * for the various things we need to store.
     666              :      */
     667              : 
     668              :     /* Estimate space for fixed-size state. */
     669          383 :     shm_toc_estimate_chunk(&pcxt->estimator,
     670              :                            sizeof(FixedParallelExecutorState));
     671          383 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     672              : 
     673              :     /* Estimate space for query text. */
     674          383 :     query_len = strlen(estate->es_sourceText);
     675          383 :     shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
     676          383 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     677              : 
     678              :     /* Estimate space for serialized PlannedStmt. */
     679          383 :     pstmt_len = strlen(pstmt_data) + 1;
     680          383 :     shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
     681          383 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     682              : 
     683              :     /* Estimate space for serialized ParamListInfo. */
     684          383 :     paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
     685          383 :     shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
     686          383 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     687              : 
     688              :     /*
     689              :      * Estimate space for BufferUsage.
     690              :      *
     691              :      * If EXPLAIN is not in use and there are no extensions loaded that care,
     692              :      * we could skip this.  But we have no way of knowing whether anyone's
     693              :      * looking at pgBufferUsage, so do it unconditionally.
     694              :      */
     695          383 :     shm_toc_estimate_chunk(&pcxt->estimator,
     696              :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     697          383 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     698              : 
     699              :     /*
     700              :      * Same thing for WalUsage.
     701              :      */
     702          383 :     shm_toc_estimate_chunk(&pcxt->estimator,
     703              :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     704          383 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     705              : 
     706              :     /* Estimate space for tuple queues. */
     707          383 :     shm_toc_estimate_chunk(&pcxt->estimator,
     708              :                            mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
     709          383 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     710              : 
     711              :     /*
     712              :      * Give parallel-aware nodes a chance to add to the estimates, and get a
     713              :      * count of how many PlanState nodes there are.
     714              :      */
     715          383 :     e.pcxt = pcxt;
     716          383 :     e.nnodes = 0;
     717          383 :     ExecParallelEstimate(planstate, &e);
     718              : 
     719              :     /* Estimate space for instrumentation, if required. */
     720          383 :     if (estate->es_instrument)
     721              :     {
     722           90 :         instrumentation_len =
     723              :             offsetof(SharedExecutorInstrumentation, plan_node_id) +
     724           90 :             sizeof(int) * e.nnodes;
     725           90 :         instrumentation_len = MAXALIGN(instrumentation_len);
     726           90 :         instrument_offset = instrumentation_len;
     727           90 :         instrumentation_len +=
     728           90 :             mul_size(sizeof(Instrumentation),
     729           90 :                      mul_size(e.nnodes, nworkers));
     730           90 :         shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
     731           90 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     732              : 
     733              :         /* Estimate space for JIT instrumentation, if required. */
     734           90 :         if (estate->es_jit_flags != PGJIT_NONE)
     735              :         {
     736            0 :             jit_instrumentation_len =
     737            0 :                 offsetof(SharedJitInstrumentation, jit_instr) +
     738              :                 sizeof(JitInstrumentation) * nworkers;
     739            0 :             shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
     740            0 :             shm_toc_estimate_keys(&pcxt->estimator, 1);
     741              :         }
     742              :     }
     743              : 
     744              :     /* Estimate space for DSA area. */
     745          383 :     shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
     746          383 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     747              : 
     748              :     /*
     749              :      * InitializeParallelDSM() passes the active snapshot to the parallel
     750              :      * worker, which uses it to set es_snapshot.  Make sure we don't set
     751              :      * es_snapshot differently in the child.
     752              :      */
     753              :     Assert(GetActiveSnapshot() == estate->es_snapshot);
     754              : 
     755              :     /* Everyone's had a chance to ask for space, so now create the DSM. */
     756          383 :     InitializeParallelDSM(pcxt);
     757              : 
     758              :     /*
     759              :      * OK, now we have a dynamic shared memory segment, and it should be big
     760              :      * enough to store all of the data we estimated we would want to put into
     761              :      * it, plus whatever general stuff (not specifically executor-related) the
     762              :      * ParallelContext itself needs to store there.  None of the space we
     763              :      * asked for has been allocated or initialized yet, though, so do that.
     764              :      */
     765              : 
     766              :     /* Store fixed-size state. */
     767          383 :     fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
     768          383 :     fpes->tuples_needed = tuples_needed;
     769          383 :     fpes->param_exec = InvalidDsaPointer;
     770          383 :     fpes->eflags = estate->es_top_eflags;
     771          383 :     fpes->jit_flags = estate->es_jit_flags;
     772          383 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
     773              : 
     774              :     /* Store query string */
     775          383 :     query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
     776          383 :     memcpy(query_string, estate->es_sourceText, query_len + 1);
     777          383 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
     778              : 
     779              :     /* Store serialized PlannedStmt. */
     780          383 :     pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
     781          383 :     memcpy(pstmt_space, pstmt_data, pstmt_len);
     782          383 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
     783              : 
     784              :     /* Store serialized ParamListInfo. */
     785          383 :     paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
     786          383 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
     787          383 :     SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
     788              : 
     789              :     /* Allocate space for each worker's BufferUsage; no need to initialize. */
     790          383 :     bufusage_space = shm_toc_allocate(pcxt->toc,
     791          383 :                                       mul_size(sizeof(BufferUsage), pcxt->nworkers));
     792          383 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
     793          383 :     pei->buffer_usage = bufusage_space;
     794              : 
     795              :     /* Same for WalUsage. */
     796          383 :     walusage_space = shm_toc_allocate(pcxt->toc,
     797          383 :                                       mul_size(sizeof(WalUsage), pcxt->nworkers));
     798          383 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
     799          383 :     pei->wal_usage = walusage_space;
     800              : 
     801              :     /* Set up the tuple queues that the workers will write into. */
     802          383 :     pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     803              : 
     804              :     /* We don't need the TupleQueueReaders yet, though. */
     805          383 :     pei->reader = NULL;
     806              : 
     807              :     /*
     808              :      * If instrumentation options were supplied, allocate space for the data.
     809              :      * It only gets partially initialized here; the rest happens during
     810              :      * ExecParallelInitializeDSM.
     811              :      */
     812          383 :     if (estate->es_instrument)
     813              :     {
     814              :         Instrumentation *instrument;
     815              :         int         i;
     816              : 
     817           90 :         instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
     818           90 :         instrumentation->instrument_options = estate->es_instrument;
     819           90 :         instrumentation->instrument_offset = instrument_offset;
     820           90 :         instrumentation->num_workers = nworkers;
     821           90 :         instrumentation->num_plan_nodes = e.nnodes;
     822           90 :         instrument = GetInstrumentationArray(instrumentation);
     823          930 :         for (i = 0; i < nworkers * e.nnodes; ++i)
     824          840 :             InstrInit(&instrument[i], estate->es_instrument);
     825           90 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
     826              :                        instrumentation);
     827           90 :         pei->instrumentation = instrumentation;
     828              : 
     829           90 :         if (estate->es_jit_flags != PGJIT_NONE)
     830              :         {
     831            0 :             jit_instrumentation = shm_toc_allocate(pcxt->toc,
     832              :                                                    jit_instrumentation_len);
     833            0 :             jit_instrumentation->num_workers = nworkers;
     834            0 :             memset(jit_instrumentation->jit_instr, 0,
     835              :                    sizeof(JitInstrumentation) * nworkers);
     836            0 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
     837              :                            jit_instrumentation);
     838            0 :             pei->jit_instrumentation = jit_instrumentation;
     839              :         }
     840              :     }
     841              : 
     842              :     /*
     843              :      * Create a DSA area that can be used by the leader and all workers.
     844              :      * (However, if we failed to create a DSM and are using private memory
     845              :      * instead, then skip this.)
     846              :      */
     847          383 :     if (pcxt->seg != NULL)
     848              :     {
     849              :         char       *area_space;
     850              : 
     851          383 :         area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
     852          383 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
     853          383 :         pei->area = dsa_create_in_place(area_space, dsa_minsize,
     854              :                                         LWTRANCHE_PARALLEL_QUERY_DSA,
     855              :                                         pcxt->seg);
     856              : 
     857              :         /*
     858              :          * Serialize parameters, if any, using DSA storage.  We don't dare use
     859              :          * the main parallel query DSM for this because we might relaunch
     860              :          * workers after the values have changed (and thus the amount of
     861              :          * storage required has changed).
     862              :          */
     863          383 :         if (!bms_is_empty(sendParams))
     864              :         {
     865           12 :             pei->param_exec = SerializeParamExecParams(estate, sendParams,
     866              :                                                        pei->area);
     867           12 :             fpes->param_exec = pei->param_exec;
     868              :         }
     869              :     }
     870              : 
     871              :     /*
     872              :      * Give parallel-aware nodes a chance to initialize their shared data.
     873              :      * This also initializes the elements of instrumentation->ps_instrument,
     874              :      * if it exists.
     875              :      */
     876          383 :     d.pcxt = pcxt;
     877          383 :     d.instrumentation = instrumentation;
     878          383 :     d.nnodes = 0;
     879              : 
     880              :     /* Install our DSA area while initializing the plan. */
     881          383 :     estate->es_query_dsa = pei->area;
     882          383 :     ExecParallelInitializeDSM(planstate, &d);
     883          383 :     estate->es_query_dsa = NULL;
     884              : 
     885              :     /*
     886              :      * Make sure that the world hasn't shifted under our feet.  This could
     887              :      * probably just be an Assert(), but let's be conservative for now.
     888              :      */
     889          383 :     if (e.nnodes != d.nnodes)
     890            0 :         elog(ERROR, "inconsistent count of PlanState nodes");
     891              : 
     892              :     /* OK, we're ready to rock and roll. */
     893          383 :     return pei;
     894              : }
     895              : 
     896              : /*
     897              :  * Set up tuple queue readers to read the results of a parallel subplan.
     898              :  *
     899              :  * This is separate from ExecInitParallelPlan() because we can launch the
     900              :  * worker processes and let them start doing something before we do this.
     901              :  */
     902              : void
     903          503 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
     904              : {
     905          503 :     int         nworkers = pei->pcxt->nworkers_launched;
     906              :     int         i;
     907              : 
     908              :     Assert(pei->reader == NULL);
     909              : 
     910          503 :     if (nworkers > 0)
     911              :     {
     912          503 :         pei->reader = (TupleQueueReader **)
     913          503 :             palloc(nworkers * sizeof(TupleQueueReader *));
     914              : 
     915         1843 :         for (i = 0; i < nworkers; i++)
     916              :         {
     917         1340 :             shm_mq_set_handle(pei->tqueue[i],
     918         1340 :                               pei->pcxt->worker[i].bgwhandle);
     919         1340 :             pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
     920              :         }
     921              :     }
     922          503 : }
     923              : 
     924              : /*
     925              :  * Re-initialize the parallel executor shared memory state before launching
     926              :  * a fresh batch of workers.
     927              :  */
     928              : void
     929          129 : ExecParallelReinitialize(PlanState *planstate,
     930              :                          ParallelExecutorInfo *pei,
     931              :                          Bitmapset *sendParams)
     932              : {
     933          129 :     EState     *estate = planstate->state;
     934              :     FixedParallelExecutorState *fpes;
     935              : 
     936              :     /* Old workers must already be shut down */
     937              :     Assert(pei->finished);
     938              : 
     939              :     /*
     940              :      * Force any initplan outputs that we're going to pass to workers to be
     941              :      * evaluated, if they weren't already (see comments in
     942              :      * ExecInitParallelPlan).
     943              :      */
     944          129 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     945              : 
     946          129 :     ReinitializeParallelDSM(pei->pcxt);
     947          129 :     pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
     948          129 :     pei->reader = NULL;
     949          129 :     pei->finished = false;
     950              : 
     951          129 :     fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
     952              : 
     953              :     /* Free any serialized parameters from the last round. */
     954          129 :     if (DsaPointerIsValid(fpes->param_exec))
     955              :     {
     956            0 :         dsa_free(pei->area, fpes->param_exec);
     957            0 :         fpes->param_exec = InvalidDsaPointer;
     958              :     }
     959              : 
     960              :     /* Serialize current parameter values if required. */
     961          129 :     if (!bms_is_empty(sendParams))
     962              :     {
     963            0 :         pei->param_exec = SerializeParamExecParams(estate, sendParams,
     964              :                                                    pei->area);
     965            0 :         fpes->param_exec = pei->param_exec;
     966              :     }
     967              : 
     968              :     /* Traverse plan tree and let each child node reset associated state. */
     969          129 :     estate->es_query_dsa = pei->area;
     970          129 :     ExecParallelReInitializeDSM(planstate, pei->pcxt);
     971          129 :     estate->es_query_dsa = NULL;
     972          129 : }
     973              : 
     974              : /*
     975              :  * Traverse plan tree to reinitialize per-node dynamic shared memory state
     976              :  */
     977              : static bool
     978          333 : ExecParallelReInitializeDSM(PlanState *planstate,
     979              :                             ParallelContext *pcxt)
     980              : {
     981          333 :     if (planstate == NULL)
     982            0 :         return false;
     983              : 
     984              :     /*
     985              :      * Call reinitializers for DSM-using plan nodes.
     986              :      */
     987          333 :     switch (nodeTag(planstate))
     988              :     {
     989          138 :         case T_SeqScanState:
     990          138 :             if (planstate->plan->parallel_aware)
     991          114 :                 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
     992              :                                            pcxt);
     993          138 :             break;
     994            6 :         case T_IndexScanState:
     995            6 :             if (planstate->plan->parallel_aware)
     996            6 :                 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
     997              :                                              pcxt);
     998            6 :             break;
     999            6 :         case T_IndexOnlyScanState:
    1000            6 :             if (planstate->plan->parallel_aware)
    1001            6 :                 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
    1002              :                                                  pcxt);
    1003            6 :             break;
    1004            0 :         case T_ForeignScanState:
    1005            0 :             if (planstate->plan->parallel_aware)
    1006            0 :                 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
    1007              :                                                pcxt);
    1008            0 :             break;
    1009            0 :         case T_TidRangeScanState:
    1010            0 :             if (planstate->plan->parallel_aware)
    1011            0 :                 ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
    1012              :                                                 pcxt);
    1013            0 :             break;
    1014            0 :         case T_AppendState:
    1015            0 :             if (planstate->plan->parallel_aware)
    1016            0 :                 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
    1017            0 :             break;
    1018            0 :         case T_CustomScanState:
    1019            0 :             if (planstate->plan->parallel_aware)
    1020            0 :                 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
    1021              :                                               pcxt);
    1022            0 :             break;
    1023           27 :         case T_BitmapHeapScanState:
    1024           27 :             if (planstate->plan->parallel_aware)
    1025           27 :                 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
    1026              :                                               pcxt);
    1027           27 :             break;
    1028           48 :         case T_HashJoinState:
    1029           48 :             if (planstate->plan->parallel_aware)
    1030           24 :                 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
    1031              :                                             pcxt);
    1032           48 :             break;
    1033           90 :         case T_BitmapIndexScanState:
    1034              :         case T_HashState:
    1035              :         case T_SortState:
    1036              :         case T_IncrementalSortState:
    1037              :         case T_MemoizeState:
    1038              :             /* these nodes have DSM state, but no reinitialization is required */
    1039           90 :             break;
    1040              : 
    1041           18 :         default:
    1042           18 :             break;
    1043              :     }
    1044              : 
    1045          333 :     return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
    1046              : }
    1047              : 
    1048              : /*
    1049              :  * Copy instrumentation information about this node and its descendants from
    1050              :  * dynamic shared memory.
    1051              :  */
    1052              : static bool
    1053          513 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
    1054              :                                     SharedExecutorInstrumentation *instrumentation)
    1055              : {
    1056              :     Instrumentation *instrument;
    1057              :     int         i;
    1058              :     int         n;
    1059              :     int         ibytes;
    1060          513 :     int         plan_node_id = planstate->plan->plan_node_id;
    1061              :     MemoryContext oldcontext;
    1062              : 
    1063              :     /* Find the instrumentation for this node. */
    1064         2319 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1065         2319 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1066          513 :             break;
    1067          513 :     if (i >= instrumentation->num_plan_nodes)
    1068            0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1069              : 
    1070              :     /* Accumulate the statistics from all workers. */
    1071          513 :     instrument = GetInstrumentationArray(instrumentation);
    1072          513 :     instrument += i * instrumentation->num_workers;
    1073         1353 :     for (n = 0; n < instrumentation->num_workers; ++n)
    1074          840 :         InstrAggNode(planstate->instrument, &instrument[n]);
    1075              : 
    1076              :     /*
    1077              :      * Also store the per-worker detail.
    1078              :      *
    1079              :      * Worker instrumentation should be allocated in the same context as the
    1080              :      * regular instrumentation information, which is the per-query context.
    1081              :      * Switch into per-query memory context.
    1082              :      */
    1083          513 :     oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
    1084          513 :     ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
    1085          513 :     planstate->worker_instrument =
    1086          513 :         palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
    1087          513 :     MemoryContextSwitchTo(oldcontext);
    1088              : 
    1089          513 :     planstate->worker_instrument->num_workers = instrumentation->num_workers;
    1090          513 :     memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
    1091              : 
    1092              :     /* Perform any node-type-specific work that needs to be done. */
    1093          513 :     switch (nodeTag(planstate))
    1094              :     {
    1095          135 :         case T_IndexScanState:
    1096          135 :             ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
    1097          135 :             break;
    1098            0 :         case T_IndexOnlyScanState:
    1099            0 :             ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
    1100            0 :             break;
    1101            0 :         case T_BitmapIndexScanState:
    1102            0 :             ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
    1103            0 :             break;
    1104            6 :         case T_SortState:
    1105            6 :             ExecSortRetrieveInstrumentation((SortState *) planstate);
    1106            6 :             break;
    1107            0 :         case T_IncrementalSortState:
    1108            0 :             ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
    1109            0 :             break;
    1110           42 :         case T_HashState:
    1111           42 :             ExecHashRetrieveInstrumentation((HashState *) planstate);
    1112           42 :             break;
    1113           51 :         case T_AggState:
    1114           51 :             ExecAggRetrieveInstrumentation((AggState *) planstate);
    1115           51 :             break;
    1116            0 :         case T_MemoizeState:
    1117            0 :             ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
    1118            0 :             break;
    1119            0 :         case T_BitmapHeapScanState:
    1120            0 :             ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
    1121            0 :             break;
    1122          279 :         default:
    1123          279 :             break;
    1124              :     }
    1125              : 
    1126          513 :     return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
    1127              :                                  instrumentation);
    1128              : }
    1129              : 
    1130              : /*
    1131              :  * Add up the workers' JIT instrumentation from dynamic shared memory.
    1132              :  */
    1133              : static void
    1134            0 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
    1135              :                                        SharedJitInstrumentation *shared_jit)
    1136              : {
    1137              :     JitInstrumentation *combined;
    1138              :     int         ibytes;
    1139              : 
    1140              :     int         n;
    1141              : 
    1142              :     /*
    1143              :      * Accumulate worker JIT instrumentation into the combined JIT
    1144              :      * instrumentation, allocating it if required.
    1145              :      */
    1146            0 :     if (!planstate->state->es_jit_worker_instr)
    1147            0 :         planstate->state->es_jit_worker_instr =
    1148            0 :             MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
    1149            0 :     combined = planstate->state->es_jit_worker_instr;
    1150              : 
    1151              :     /* Accumulate all the workers' instrumentations. */
    1152            0 :     for (n = 0; n < shared_jit->num_workers; ++n)
    1153            0 :         InstrJitAgg(combined, &shared_jit->jit_instr[n]);
    1154              : 
    1155              :     /*
    1156              :      * Store the per-worker detail.
    1157              :      *
    1158              :      * Similar to ExecParallelRetrieveInstrumentation(), allocate the
    1159              :      * instrumentation in per-query context.
    1160              :      */
    1161            0 :     ibytes = offsetof(SharedJitInstrumentation, jit_instr)
    1162            0 :         + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
    1163            0 :     planstate->worker_jit_instrument =
    1164            0 :         MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
    1165              : 
    1166            0 :     memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
    1167            0 : }
    1168              : 
    1169              : /*
    1170              :  * Finish parallel execution.  We wait for parallel workers to finish, and
    1171              :  * accumulate their buffer/WAL usage.
    1172              :  */
    1173              : void
    1174          922 : ExecParallelFinish(ParallelExecutorInfo *pei)
    1175              : {
    1176          922 :     int         nworkers = pei->pcxt->nworkers_launched;
    1177              :     int         i;
    1178              : 
    1179              :     /* Make this be a no-op if called twice in a row. */
    1180          922 :     if (pei->finished)
    1181          416 :         return;
    1182              : 
    1183              :     /*
    1184              :      * Detach from tuple queues ASAP, so that any still-active workers will
    1185              :      * notice that no further results are wanted.
    1186              :      */
    1187          506 :     if (pei->tqueue != NULL)
    1188              :     {
    1189         1840 :         for (i = 0; i < nworkers; i++)
    1190         1334 :             shm_mq_detach(pei->tqueue[i]);
    1191          506 :         pfree(pei->tqueue);
    1192          506 :         pei->tqueue = NULL;
    1193              :     }
    1194              : 
    1195              :     /*
    1196              :      * While we're waiting for the workers to finish, let's get rid of the
    1197              :      * tuple queue readers.  (Any other local cleanup could be done here too.)
    1198              :      */
    1199          506 :     if (pei->reader != NULL)
    1200              :     {
    1201         1831 :         for (i = 0; i < nworkers; i++)
    1202         1334 :             DestroyTupleQueueReader(pei->reader[i]);
    1203          497 :         pfree(pei->reader);
    1204          497 :         pei->reader = NULL;
    1205              :     }
    1206              : 
    1207              :     /* Now wait for the workers to finish. */
    1208          506 :     WaitForParallelWorkersToFinish(pei->pcxt);
    1209              : 
    1210              :     /*
    1211              :      * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
    1212              :      * finish, or we might get incomplete data.)
    1213              :      */
    1214         1840 :     for (i = 0; i < nworkers; i++)
    1215         1334 :         InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
    1216              : 
    1217          506 :     pei->finished = true;
    1218              : }
    1219              : 
    1220              : /*
    1221              :  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
    1222              :  * resources still exist after ExecParallelFinish.  We separate these
    1223              :  * routines because someone might want to examine the contents of the DSM
    1224              :  * after ExecParallelFinish and before calling this routine.
    1225              :  */
    1226              : void
    1227          377 : ExecParallelCleanup(ParallelExecutorInfo *pei)
    1228              : {
    1229              :     /* Accumulate instrumentation, if any. */
    1230          377 :     if (pei->instrumentation)
    1231           90 :         ExecParallelRetrieveInstrumentation(pei->planstate,
    1232              :                                             pei->instrumentation);
    1233              : 
    1234              :     /* Accumulate JIT instrumentation, if any. */
    1235          377 :     if (pei->jit_instrumentation)
    1236            0 :         ExecParallelRetrieveJitInstrumentation(pei->planstate,
    1237            0 :                                                pei->jit_instrumentation);
    1238              : 
    1239              :     /* Free any serialized parameters. */
    1240          377 :     if (DsaPointerIsValid(pei->param_exec))
    1241              :     {
    1242           12 :         dsa_free(pei->area, pei->param_exec);
    1243           12 :         pei->param_exec = InvalidDsaPointer;
    1244              :     }
    1245          377 :     if (pei->area != NULL)
    1246              :     {
    1247          377 :         dsa_detach(pei->area);
    1248          377 :         pei->area = NULL;
    1249              :     }
    1250          377 :     if (pei->pcxt != NULL)
    1251              :     {
    1252          377 :         DestroyParallelContext(pei->pcxt);
    1253          377 :         pei->pcxt = NULL;
    1254              :     }
    1255          377 :     pfree(pei);
    1256          377 : }
    1257              : 
    1258              : /*
    1259              :  * Create a DestReceiver to write tuples we produce to the shm_mq designated
    1260              :  * for that purpose.
    1261              :  */
    1262              : static DestReceiver *
    1263         1340 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
    1264              : {
    1265              :     char       *mqspace;
    1266              :     shm_mq     *mq;
    1267              : 
    1268         1340 :     mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
    1269         1340 :     mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
    1270         1340 :     mq = (shm_mq *) mqspace;
    1271         1340 :     shm_mq_set_sender(mq, MyProc);
    1272         1340 :     return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
    1273              : }
    1274              : 
    1275              : /*
    1276              :  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
    1277              :  */
    1278              : static QueryDesc *
    1279         1340 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
    1280              :                          int instrument_options)
    1281              : {
    1282              :     char       *pstmtspace;
    1283              :     char       *paramspace;
    1284              :     PlannedStmt *pstmt;
    1285              :     ParamListInfo paramLI;
    1286              :     char       *queryString;
    1287              : 
    1288              :     /* Get the query string from shared memory */
    1289         1340 :     queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
    1290              : 
    1291              :     /* Reconstruct leader-supplied PlannedStmt. */
    1292         1340 :     pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
    1293         1340 :     pstmt = (PlannedStmt *) stringToNode(pstmtspace);
    1294              : 
    1295              :     /* Reconstruct ParamListInfo. */
    1296         1340 :     paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
    1297         1340 :     paramLI = RestoreParamList(&paramspace);
    1298              : 
    1299              :     /* Create a QueryDesc for the query. */
    1300         1340 :     return CreateQueryDesc(pstmt,
    1301              :                            queryString,
    1302              :                            GetActiveSnapshot(), InvalidSnapshot,
    1303              :                            receiver, paramLI, NULL, instrument_options);
    1304              : }
    1305              : 
    1306              : /*
    1307              :  * Copy instrumentation information from this node and its descendants into
    1308              :  * dynamic shared memory, so that the parallel leader can retrieve it.
    1309              :  */
    1310              : static bool
    1311         1188 : ExecParallelReportInstrumentation(PlanState *planstate,
    1312              :                                   SharedExecutorInstrumentation *instrumentation)
    1313              : {
    1314              :     int         i;
    1315         1188 :     int         plan_node_id = planstate->plan->plan_node_id;
    1316              :     Instrumentation *instrument;
    1317              : 
    1318         1188 :     InstrEndLoop(planstate->instrument);
    1319              : 
    1320              :     /*
    1321              :      * If we shuffled the plan_node_id values in ps_instrument into sorted
    1322              :      * order, we could use binary search here.  This might matter someday if
    1323              :      * we're pushing down sufficiently large plan trees.  For now, do it the
    1324              :      * slow, dumb way.
    1325              :      */
    1326         3906 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1327         3906 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1328         1188 :             break;
    1329         1188 :     if (i >= instrumentation->num_plan_nodes)
    1330            0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1331              : 
    1332              :     /*
    1333              :      * Add our statistics to the per-node, per-worker totals.  It's possible
    1334              :      * that this could happen more than once if we relaunched workers.
    1335              :      */
    1336         1188 :     instrument = GetInstrumentationArray(instrumentation);
    1337         1188 :     instrument += i * instrumentation->num_workers;
    1338              :     Assert(IsParallelWorker());
    1339              :     Assert(ParallelWorkerNumber < instrumentation->num_workers);
    1340         1188 :     InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
    1341              : 
    1342         1188 :     return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
    1343              :                                  instrumentation);
    1344              : }
    1345              : 
    1346              : /*
    1347              :  * Initialize the PlanState and its descendants with the information
    1348              :  * retrieved from shared memory.  This has to be done once the PlanState
    1349              :  * is allocated and initialized by executor; that is, after ExecutorStart().
    1350              :  */
    1351              : static bool
    1352         4258 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
    1353              : {
    1354         4258 :     if (planstate == NULL)
    1355            0 :         return false;
    1356              : 
    1357         4258 :     switch (nodeTag(planstate))
    1358              :     {
    1359         1691 :         case T_SeqScanState:
    1360         1691 :             if (planstate->plan->parallel_aware)
    1361         1377 :                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
    1362         1691 :             break;
    1363          198 :         case T_IndexScanState:
    1364              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1365          198 :             ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
    1366          198 :             break;
    1367          121 :         case T_IndexOnlyScanState:
    1368              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1369          121 :             ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
    1370              :                                               pwcxt);
    1371          121 :             break;
    1372          135 :         case T_BitmapIndexScanState:
    1373              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1374          135 :             ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
    1375              :                                                 pwcxt);
    1376          135 :             break;
    1377            0 :         case T_ForeignScanState:
    1378            0 :             if (planstate->plan->parallel_aware)
    1379            0 :                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
    1380              :                                                 pwcxt);
    1381            0 :             break;
    1382           48 :         case T_TidRangeScanState:
    1383           48 :             if (planstate->plan->parallel_aware)
    1384           48 :                 ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
    1385              :                                                  pwcxt);
    1386           48 :             break;
    1387          187 :         case T_AppendState:
    1388          187 :             if (planstate->plan->parallel_aware)
    1389          157 :                 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
    1390          187 :             break;
    1391            0 :         case T_CustomScanState:
    1392            0 :             if (planstate->plan->parallel_aware)
    1393            0 :                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
    1394              :                                                pwcxt);
    1395            0 :             break;
    1396          135 :         case T_BitmapHeapScanState:
    1397          135 :             if (planstate->plan->parallel_aware)
    1398          134 :                 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
    1399              :                                                pwcxt);
    1400          135 :             break;
    1401          279 :         case T_HashJoinState:
    1402          279 :             if (planstate->plan->parallel_aware)
    1403          159 :                 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
    1404              :                                              pwcxt);
    1405          279 :             break;
    1406          279 :         case T_HashState:
    1407              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1408          279 :             ExecHashInitializeWorker((HashState *) planstate, pwcxt);
    1409          279 :             break;
    1410          236 :         case T_SortState:
    1411              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1412          236 :             ExecSortInitializeWorker((SortState *) planstate, pwcxt);
    1413          236 :             break;
    1414            0 :         case T_IncrementalSortState:
    1415              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1416            0 :             ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
    1417              :                                                 pwcxt);
    1418            0 :             break;
    1419          830 :         case T_AggState:
    1420              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1421          830 :             ExecAggInitializeWorker((AggState *) planstate, pwcxt);
    1422          830 :             break;
    1423            6 :         case T_MemoizeState:
    1424              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1425            6 :             ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
    1426            6 :             break;
    1427          113 :         default:
    1428          113 :             break;
    1429              :     }
    1430              : 
    1431         4258 :     return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
    1432              :                                  pwcxt);
    1433              : }
    1434              : 
    1435              : /*
    1436              :  * Main entrypoint for parallel query worker processes.
    1437              :  *
    1438              :  * We reach this function from ParallelWorkerMain, so the setup necessary to
    1439              :  * create a sensible parallel environment has already been done;
    1440              :  * ParallelWorkerMain worries about stuff like the transaction state, combo
    1441              :  * CID mappings, and GUC values, so we don't need to deal with any of that
    1442              :  * here.
    1443              :  *
    1444              :  * Our job is to deal with concerns specific to the executor.  The parallel
    1445              :  * group leader will have stored a serialized PlannedStmt, and it's our job
    1446              :  * to execute that plan and write the resulting tuples to the appropriate
    1447              :  * tuple queue.  Various bits of supporting information that we need in order
    1448              :  * to do this are also stored in the dsm_segment and can be accessed through
    1449              :  * the shm_toc.
    1450              :  */
    1451              : void
    1452         1340 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    1453              : {
    1454              :     FixedParallelExecutorState *fpes;
    1455              :     BufferUsage *buffer_usage;
    1456              :     WalUsage   *wal_usage;
    1457              :     DestReceiver *receiver;
    1458              :     QueryDesc  *queryDesc;
    1459              :     SharedExecutorInstrumentation *instrumentation;
    1460              :     SharedJitInstrumentation *jit_instrumentation;
    1461         1340 :     int         instrument_options = 0;
    1462              :     void       *area_space;
    1463              :     dsa_area   *area;
    1464              :     ParallelWorkerContext pwcxt;
    1465              : 
    1466              :     /* Get fixed-size state. */
    1467         1340 :     fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
    1468              : 
    1469              :     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    1470         1340 :     receiver = ExecParallelGetReceiver(seg, toc);
    1471         1340 :     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
    1472         1340 :     if (instrumentation != NULL)
    1473          363 :         instrument_options = instrumentation->instrument_options;
    1474         1340 :     jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
    1475              :                                          true);
    1476         1340 :     queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
    1477              : 
    1478              :     /* Setting debug_query_string for individual workers */
    1479         1340 :     debug_query_string = queryDesc->sourceText;
    1480              : 
    1481              :     /* Report workers' query for monitoring purposes */
    1482         1340 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1483              : 
    1484              :     /* Attach to the dynamic shared memory area. */
    1485         1340 :     area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
    1486         1340 :     area = dsa_attach_in_place(area_space, seg);
    1487              : 
    1488              :     /* Start up the executor */
    1489         1340 :     queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
    1490         1340 :     ExecutorStart(queryDesc, fpes->eflags);
    1491              : 
    1492              :     /* Special executor initialization steps for parallel workers */
    1493         1340 :     queryDesc->planstate->state->es_query_dsa = area;
    1494         1340 :     if (DsaPointerIsValid(fpes->param_exec))
    1495              :     {
    1496              :         char       *paramexec_space;
    1497              : 
    1498           36 :         paramexec_space = dsa_get_address(area, fpes->param_exec);
    1499           36 :         RestoreParamExecParams(paramexec_space, queryDesc->estate);
    1500              :     }
    1501         1340 :     pwcxt.toc = toc;
    1502         1340 :     pwcxt.seg = seg;
    1503         1340 :     ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
    1504              : 
    1505              :     /* Pass down any tuple bound */
    1506         1340 :     ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
    1507              : 
    1508              :     /*
    1509              :      * Prepare to track buffer/WAL usage during query execution.
    1510              :      *
    1511              :      * We do this after starting up the executor to match what happens in the
    1512              :      * leader, which also doesn't count buffer accesses and WAL activity that
    1513              :      * occur during executor startup.
    1514              :      */
    1515         1340 :     InstrStartParallelQuery();
    1516              : 
    1517              :     /*
    1518              :      * Run the plan.  If we specified a tuple bound, be careful not to demand
    1519              :      * more tuples than that.
    1520              :      */
    1521         1340 :     ExecutorRun(queryDesc,
    1522              :                 ForwardScanDirection,
    1523         1340 :                 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
    1524              : 
    1525              :     /* Shut down the executor */
    1526         1334 :     ExecutorFinish(queryDesc);
    1527              : 
    1528              :     /* Report buffer/WAL usage during parallel execution. */
    1529         1334 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
    1530         1334 :     wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
    1531         1334 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1532         1334 :                           &wal_usage[ParallelWorkerNumber]);
    1533              : 
    1534              :     /* Report instrumentation data if any instrumentation options are set. */
    1535         1334 :     if (instrumentation != NULL)
    1536          363 :         ExecParallelReportInstrumentation(queryDesc->planstate,
    1537              :                                           instrumentation);
    1538              : 
    1539              :     /* Report JIT instrumentation data if any */
    1540         1334 :     if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
    1541              :     {
    1542              :         Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
    1543            0 :         jit_instrumentation->jit_instr[ParallelWorkerNumber] =
    1544            0 :             queryDesc->estate->es_jit->instr;
    1545              :     }
    1546              : 
    1547              :     /* Must do this after capturing instrumentation. */
    1548         1334 :     ExecutorEnd(queryDesc);
    1549              : 
    1550              :     /* Cleanup. */
    1551         1334 :     dsa_detach(area);
    1552         1334 :     FreeQueryDesc(queryDesc);
    1553         1334 :     receiver->rDestroy(receiver);
    1554         1334 : }
        

Generated by: LCOV version 2.0-1