LCOV - code coverage report
Current view: top level - src/backend/executor - execParallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 532 609 87.4 %
Date: 2025-04-01 14:15:22 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execParallel.c
       4             :  *    Support routines for parallel execution.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * This file contains routines that are intended to support setting up,
      10             :  * using, and tearing down a ParallelContext from within the PostgreSQL
      11             :  * executor.  The ParallelContext machinery will handle starting the
      12             :  * workers and ensuring that their state generally matches that of the
      13             :  * leader; see src/backend/access/transam/README.parallel for details.
      14             :  * However, we must save and restore relevant executor state, such as
      15             :  * any ParamListInfo associated with the query, buffer/WAL usage info, and
      16             :  * the actual plan to be passed down to the worker.
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *    src/backend/executor/execParallel.c
      20             :  *
      21             :  *-------------------------------------------------------------------------
      22             :  */
      23             : 
      24             : #include "postgres.h"
      25             : 
      26             : #include "executor/execParallel.h"
      27             : #include "executor/executor.h"
      28             : #include "executor/nodeAgg.h"
      29             : #include "executor/nodeAppend.h"
      30             : #include "executor/nodeBitmapHeapscan.h"
      31             : #include "executor/nodeBitmapIndexscan.h"
      32             : #include "executor/nodeCustom.h"
      33             : #include "executor/nodeForeignscan.h"
      34             : #include "executor/nodeHash.h"
      35             : #include "executor/nodeHashjoin.h"
      36             : #include "executor/nodeIncrementalSort.h"
      37             : #include "executor/nodeIndexonlyscan.h"
      38             : #include "executor/nodeIndexscan.h"
      39             : #include "executor/nodeMemoize.h"
      40             : #include "executor/nodeSeqscan.h"
      41             : #include "executor/nodeSort.h"
      42             : #include "executor/nodeSubplan.h"
      43             : #include "executor/tqueue.h"
      44             : #include "jit/jit.h"
      45             : #include "nodes/nodeFuncs.h"
      46             : #include "pgstat.h"
      47             : #include "tcop/tcopprot.h"
      48             : #include "utils/datum.h"
      49             : #include "utils/dsa.h"
      50             : #include "utils/lsyscache.h"
      51             : #include "utils/snapmgr.h"
      52             : 
      53             : /*
      54             :  * Magic numbers for parallel executor communication.  We use constants
      55             :  * greater than any 32-bit integer here so that values < 2^32 can be used
      56             :  * by individual parallel nodes to store their own state.
      57             :  */
      58             : #define PARALLEL_KEY_EXECUTOR_FIXED     UINT64CONST(0xE000000000000001)
      59             : #define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
      60             : #define PARALLEL_KEY_PARAMLISTINFO      UINT64CONST(0xE000000000000003)
      61             : #define PARALLEL_KEY_BUFFER_USAGE       UINT64CONST(0xE000000000000004)
      62             : #define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
      63             : #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
      64             : #define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
      65             : #define PARALLEL_KEY_QUERY_TEXT     UINT64CONST(0xE000000000000008)
      66             : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
      67             : #define PARALLEL_KEY_WAL_USAGE          UINT64CONST(0xE00000000000000A)
      68             : 
      69             : #define PARALLEL_TUPLE_QUEUE_SIZE       65536
      70             : 
      71             : /*
      72             :  * Fixed-size random stuff that we need to pass to parallel workers.
      73             :  */
      74             : typedef struct FixedParallelExecutorState
      75             : {
      76             :     int64       tuples_needed;  /* tuple bound, see ExecSetTupleBound */
      77             :     dsa_pointer param_exec;
      78             :     int         eflags;
      79             :     int         jit_flags;
      80             : } FixedParallelExecutorState;
      81             : 
      82             : /*
      83             :  * DSM structure for accumulating per-PlanState instrumentation.
      84             :  *
      85             :  * instrument_options: Same meaning here as in instrument.c.
      86             :  *
      87             :  * instrument_offset: Offset, relative to the start of this structure,
      88             :  * of the first Instrumentation object.  This will depend on the length of
      89             :  * the plan_node_id array.
      90             :  *
      91             :  * num_workers: Number of workers.
      92             :  *
      93             :  * num_plan_nodes: Number of plan nodes.
      94             :  *
      95             :  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
      96             :  * from parallel workers.  The length of this array is given by num_plan_nodes.
      97             :  */
      98             : struct SharedExecutorInstrumentation
      99             : {
     100             :     int         instrument_options;
     101             :     int         instrument_offset;
     102             :     int         num_workers;
     103             :     int         num_plan_nodes;
     104             :     int         plan_node_id[FLEXIBLE_ARRAY_MEMBER];
     105             :     /* array of num_plan_nodes * num_workers Instrumentation objects follows */
     106             : };
     107             : #define GetInstrumentationArray(sei) \
     108             :     (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
     109             :      (Instrumentation *) (((char *) sei) + sei->instrument_offset))
     110             : 
     111             : /* Context object for ExecParallelEstimate. */
     112             : typedef struct ExecParallelEstimateContext
     113             : {
     114             :     ParallelContext *pcxt;
     115             :     int         nnodes;
     116             : } ExecParallelEstimateContext;
     117             : 
     118             : /* Context object for ExecParallelInitializeDSM. */
     119             : typedef struct ExecParallelInitializeDSMContext
     120             : {
     121             :     ParallelContext *pcxt;
     122             :     SharedExecutorInstrumentation *instrumentation;
     123             :     int         nnodes;
     124             : } ExecParallelInitializeDSMContext;
     125             : 
     126             : /* Helper functions that run in the parallel leader. */
     127             : static char *ExecSerializePlan(Plan *plan, EState *estate);
     128             : static bool ExecParallelEstimate(PlanState *planstate,
     129             :                                  ExecParallelEstimateContext *e);
     130             : static bool ExecParallelInitializeDSM(PlanState *planstate,
     131             :                                       ExecParallelInitializeDSMContext *d);
     132             : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
     133             :                                                     bool reinitialize);
     134             : static bool ExecParallelReInitializeDSM(PlanState *planstate,
     135             :                                         ParallelContext *pcxt);
     136             : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     137             :                                                 SharedExecutorInstrumentation *instrumentation);
     138             : 
     139             : /* Helper function that runs in the parallel worker. */
     140             : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
     141             : 
     142             : /*
     143             :  * Create a serialized representation of the plan to be sent to each worker.
     144             :  */
     145             : static char *
     146         718 : ExecSerializePlan(Plan *plan, EState *estate)
     147             : {
     148             :     PlannedStmt *pstmt;
     149             :     ListCell   *lc;
     150             : 
     151             :     /* We can't scribble on the original plan, so make a copy. */
     152         718 :     plan = copyObject(plan);
     153             : 
     154             :     /*
     155             :      * The worker will start its own copy of the executor, and that copy will
     156             :      * insert a junk filter if the toplevel node has any resjunk entries. We
     157             :      * don't want that to happen, because while resjunk columns shouldn't be
     158             :      * sent back to the user, here the tuples are coming back to another
     159             :      * backend which may very well need them.  So mutate the target list
     160             :      * accordingly.  This is sort of a hack; there might be better ways to do
     161             :      * this...
     162             :      */
     163        1978 :     foreach(lc, plan->targetlist)
     164             :     {
     165        1260 :         TargetEntry *tle = lfirst_node(TargetEntry, lc);
     166             : 
     167        1260 :         tle->resjunk = false;
     168             :     }
     169             : 
     170             :     /*
     171             :      * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
     172             :      * for our purposes, but the worker will need at least a minimal
     173             :      * PlannedStmt to start the executor.
     174             :      */
     175         718 :     pstmt = makeNode(PlannedStmt);
     176         718 :     pstmt->commandType = CMD_SELECT;
     177         718 :     pstmt->queryId = pgstat_get_my_query_id();
     178         718 :     pstmt->planId = pgstat_get_my_plan_id();
     179         718 :     pstmt->hasReturning = false;
     180         718 :     pstmt->hasModifyingCTE = false;
     181         718 :     pstmt->canSetTag = true;
     182         718 :     pstmt->transientPlan = false;
     183         718 :     pstmt->dependsOnRole = false;
     184         718 :     pstmt->parallelModeNeeded = false;
     185         718 :     pstmt->planTree = plan;
     186         718 :     pstmt->partPruneInfos = estate->es_part_prune_infos;
     187         718 :     pstmt->rtable = estate->es_range_table;
     188         718 :     pstmt->unprunableRelids = estate->es_unpruned_relids;
     189         718 :     pstmt->permInfos = estate->es_rteperminfos;
     190         718 :     pstmt->resultRelations = NIL;
     191         718 :     pstmt->appendRelations = NIL;
     192             : 
     193             :     /*
     194             :      * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
     195             :      * for unsafe ones (so that the list indexes of the safe ones are
     196             :      * preserved).  This positively ensures that the worker won't try to run,
     197             :      * or even do ExecInitNode on, an unsafe subplan.  That's important to
     198             :      * protect, eg, non-parallel-aware FDWs from getting into trouble.
     199             :      */
     200         718 :     pstmt->subplans = NIL;
     201         772 :     foreach(lc, estate->es_plannedstmt->subplans)
     202             :     {
     203          54 :         Plan       *subplan = (Plan *) lfirst(lc);
     204             : 
     205          54 :         if (subplan && !subplan->parallel_safe)
     206          12 :             subplan = NULL;
     207          54 :         pstmt->subplans = lappend(pstmt->subplans, subplan);
     208             :     }
     209             : 
     210         718 :     pstmt->rewindPlanIDs = NULL;
     211         718 :     pstmt->rowMarks = NIL;
     212         718 :     pstmt->relationOids = NIL;
     213         718 :     pstmt->invalItems = NIL; /* workers can't replan anyway... */
     214         718 :     pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
     215         718 :     pstmt->utilityStmt = NULL;
     216         718 :     pstmt->stmt_location = -1;
     217         718 :     pstmt->stmt_len = -1;
     218             : 
     219             :     /* Return serialized copy of our dummy PlannedStmt. */
     220         718 :     return nodeToString(pstmt);
     221             : }
     222             : 
     223             : /*
     224             :  * Parallel-aware plan nodes (and occasionally others) may need some state
     225             :  * which is shared across all parallel workers.  Before we size the DSM, give
     226             :  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
     227             :  * &pcxt->estimator.
     228             :  *
     229             :  * While we're at it, count the number of PlanState nodes in the tree, so
     230             :  * we know how many Instrumentation structures we need.
     231             :  */
     232             : static bool
     233        2960 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
     234             : {
     235        2960 :     if (planstate == NULL)
     236           0 :         return false;
     237             : 
     238             :     /* Count this node. */
     239        2960 :     e->nnodes++;
     240             : 
     241        2960 :     switch (nodeTag(planstate))
     242             :     {
     243        1144 :         case T_SeqScanState:
     244        1144 :             if (planstate->plan->parallel_aware)
     245         906 :                 ExecSeqScanEstimate((SeqScanState *) planstate,
     246             :                                     e->pcxt);
     247        1144 :             break;
     248         294 :         case T_IndexScanState:
     249             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     250         294 :             ExecIndexScanEstimate((IndexScanState *) planstate,
     251             :                                   e->pcxt);
     252         294 :             break;
     253          58 :         case T_IndexOnlyScanState:
     254             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     255          58 :             ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
     256             :                                       e->pcxt);
     257          58 :             break;
     258          20 :         case T_BitmapIndexScanState:
     259             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     260          20 :             ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
     261             :                                         e->pcxt);
     262          20 :             break;
     263           0 :         case T_ForeignScanState:
     264           0 :             if (planstate->plan->parallel_aware)
     265           0 :                 ExecForeignScanEstimate((ForeignScanState *) planstate,
     266             :                                         e->pcxt);
     267           0 :             break;
     268         186 :         case T_AppendState:
     269         186 :             if (planstate->plan->parallel_aware)
     270         138 :                 ExecAppendEstimate((AppendState *) planstate,
     271             :                                    e->pcxt);
     272         186 :             break;
     273           0 :         case T_CustomScanState:
     274           0 :             if (planstate->plan->parallel_aware)
     275           0 :                 ExecCustomScanEstimate((CustomScanState *) planstate,
     276             :                                        e->pcxt);
     277           0 :             break;
     278          20 :         case T_BitmapHeapScanState:
     279          20 :             if (planstate->plan->parallel_aware)
     280          18 :                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
     281             :                                        e->pcxt);
     282          20 :             break;
     283         192 :         case T_HashJoinState:
     284         192 :             if (planstate->plan->parallel_aware)
     285         120 :                 ExecHashJoinEstimate((HashJoinState *) planstate,
     286             :                                      e->pcxt);
     287         192 :             break;
     288         192 :         case T_HashState:
     289             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     290         192 :             ExecHashEstimate((HashState *) planstate, e->pcxt);
     291         192 :             break;
     292         152 :         case T_SortState:
     293             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     294         152 :             ExecSortEstimate((SortState *) planstate, e->pcxt);
     295         152 :             break;
     296           0 :         case T_IncrementalSortState:
     297             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     298           0 :             ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
     299           0 :             break;
     300         554 :         case T_AggState:
     301             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     302         554 :             ExecAggEstimate((AggState *) planstate, e->pcxt);
     303         554 :             break;
     304           6 :         case T_MemoizeState:
     305             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     306           6 :             ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
     307           6 :             break;
     308         142 :         default:
     309         142 :             break;
     310             :     }
     311             : 
     312        2960 :     return planstate_tree_walker(planstate, ExecParallelEstimate, e);
     313             : }
     314             : 
     315             : /*
     316             :  * Estimate the amount of space required to serialize the indicated parameters.
     317             :  */
     318             : static Size
     319          24 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
     320             : {
     321             :     int         paramid;
     322          24 :     Size        sz = sizeof(int);
     323             : 
     324          24 :     paramid = -1;
     325          54 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     326             :     {
     327             :         Oid         typeOid;
     328             :         int16       typLen;
     329             :         bool        typByVal;
     330             :         ParamExecData *prm;
     331             : 
     332          30 :         prm = &(estate->es_param_exec_vals[paramid]);
     333          30 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     334             :                                paramid);
     335             : 
     336          30 :         sz = add_size(sz, sizeof(int)); /* space for paramid */
     337             : 
     338             :         /* space for datum/isnull */
     339          30 :         if (OidIsValid(typeOid))
     340          30 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     341             :         else
     342             :         {
     343             :             /* If no type OID, assume by-value, like copyParamList does. */
     344           0 :             typLen = sizeof(Datum);
     345           0 :             typByVal = true;
     346             :         }
     347          30 :         sz = add_size(sz,
     348          30 :                       datumEstimateSpace(prm->value, prm->isnull,
     349             :                                          typByVal, typLen));
     350             :     }
     351          24 :     return sz;
     352             : }
     353             : 
     354             : /*
     355             :  * Serialize specified PARAM_EXEC parameters.
     356             :  *
     357             :  * We write the number of parameters first, as a 4-byte integer, and then
     358             :  * write details for each parameter in turn.  The details for each parameter
     359             :  * consist of a 4-byte paramid (location of param in execution time internal
     360             :  * parameter array) and then the datum as serialized by datumSerialize().
     361             :  */
     362             : static dsa_pointer
     363          24 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
     364             : {
     365             :     Size        size;
     366             :     int         nparams;
     367             :     int         paramid;
     368             :     ParamExecData *prm;
     369             :     dsa_pointer handle;
     370             :     char       *start_address;
     371             : 
     372             :     /* Allocate enough space for the current parameter values. */
     373          24 :     size = EstimateParamExecSpace(estate, params);
     374          24 :     handle = dsa_allocate(area, size);
     375          24 :     start_address = dsa_get_address(area, handle);
     376             : 
     377             :     /* First write the number of parameters as a 4-byte integer. */
     378          24 :     nparams = bms_num_members(params);
     379          24 :     memcpy(start_address, &nparams, sizeof(int));
     380          24 :     start_address += sizeof(int);
     381             : 
     382             :     /* Write details for each parameter in turn. */
     383          24 :     paramid = -1;
     384          54 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     385             :     {
     386             :         Oid         typeOid;
     387             :         int16       typLen;
     388             :         bool        typByVal;
     389             : 
     390          30 :         prm = &(estate->es_param_exec_vals[paramid]);
     391          30 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     392             :                                paramid);
     393             : 
     394             :         /* Write paramid. */
     395          30 :         memcpy(start_address, &paramid, sizeof(int));
     396          30 :         start_address += sizeof(int);
     397             : 
     398             :         /* Write datum/isnull */
     399          30 :         if (OidIsValid(typeOid))
     400          30 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     401             :         else
     402             :         {
     403             :             /* If no type OID, assume by-value, like copyParamList does. */
     404           0 :             typLen = sizeof(Datum);
     405           0 :             typByVal = true;
     406             :         }
     407          30 :         datumSerialize(prm->value, prm->isnull, typByVal, typLen,
     408             :                        &start_address);
     409             :     }
     410             : 
     411          24 :     return handle;
     412             : }
     413             : 
     414             : /*
     415             :  * Restore specified PARAM_EXEC parameters.
     416             :  */
     417             : static void
     418          72 : RestoreParamExecParams(char *start_address, EState *estate)
     419             : {
     420             :     int         nparams;
     421             :     int         i;
     422             :     int         paramid;
     423             : 
     424          72 :     memcpy(&nparams, start_address, sizeof(int));
     425          72 :     start_address += sizeof(int);
     426             : 
     427         156 :     for (i = 0; i < nparams; i++)
     428             :     {
     429             :         ParamExecData *prm;
     430             : 
     431             :         /* Read paramid */
     432          84 :         memcpy(&paramid, start_address, sizeof(int));
     433          84 :         start_address += sizeof(int);
     434          84 :         prm = &(estate->es_param_exec_vals[paramid]);
     435             : 
     436             :         /* Read datum/isnull. */
     437          84 :         prm->value = datumRestore(&start_address, &prm->isnull);
     438          84 :         prm->execPlan = NULL;
     439             :     }
     440          72 : }
     441             : 
     442             : /*
     443             :  * Initialize the dynamic shared memory segment that will be used to control
     444             :  * parallel execution.
     445             :  */
     446             : static bool
     447        2960 : ExecParallelInitializeDSM(PlanState *planstate,
     448             :                           ExecParallelInitializeDSMContext *d)
     449             : {
     450        2960 :     if (planstate == NULL)
     451           0 :         return false;
     452             : 
     453             :     /* If instrumentation is enabled, initialize slot for this node. */
     454        2960 :     if (d->instrumentation != NULL)
     455        1026 :         d->instrumentation->plan_node_id[d->nnodes] =
     456        1026 :             planstate->plan->plan_node_id;
     457             : 
     458             :     /* Count this node. */
     459        2960 :     d->nnodes++;
     460             : 
     461             :     /*
     462             :      * Call initializers for DSM-using plan nodes.
     463             :      *
     464             :      * Most plan nodes won't do anything here, but plan nodes that allocated
     465             :      * DSM may need to initialize shared state in the DSM before parallel
     466             :      * workers are launched.  They can allocate the space they previously
     467             :      * estimated using shm_toc_allocate, and add the keys they previously
     468             :      * estimated using shm_toc_insert, in each case targeting pcxt->toc.
     469             :      */
     470        2960 :     switch (nodeTag(planstate))
     471             :     {
     472        1144 :         case T_SeqScanState:
     473        1144 :             if (planstate->plan->parallel_aware)
     474         906 :                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
     475             :                                          d->pcxt);
     476        1144 :             break;
     477         294 :         case T_IndexScanState:
     478             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     479         294 :             ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
     480         294 :             break;
     481          58 :         case T_IndexOnlyScanState:
     482             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     483          58 :             ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
     484             :                                            d->pcxt);
     485          58 :             break;
     486          20 :         case T_BitmapIndexScanState:
     487             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     488          20 :             ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
     489          20 :             break;
     490           0 :         case T_ForeignScanState:
     491           0 :             if (planstate->plan->parallel_aware)
     492           0 :                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
     493             :                                              d->pcxt);
     494           0 :             break;
     495         186 :         case T_AppendState:
     496         186 :             if (planstate->plan->parallel_aware)
     497         138 :                 ExecAppendInitializeDSM((AppendState *) planstate,
     498             :                                         d->pcxt);
     499         186 :             break;
     500           0 :         case T_CustomScanState:
     501           0 :             if (planstate->plan->parallel_aware)
     502           0 :                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
     503             :                                             d->pcxt);
     504           0 :             break;
     505          20 :         case T_BitmapHeapScanState:
     506          20 :             if (planstate->plan->parallel_aware)
     507          18 :                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
     508             :                                             d->pcxt);
     509          20 :             break;
     510         192 :         case T_HashJoinState:
     511         192 :             if (planstate->plan->parallel_aware)
     512         120 :                 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
     513             :                                           d->pcxt);
     514         192 :             break;
     515         192 :         case T_HashState:
     516             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     517         192 :             ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
     518         192 :             break;
     519         152 :         case T_SortState:
     520             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     521         152 :             ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
     522         152 :             break;
     523           0 :         case T_IncrementalSortState:
     524             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     525           0 :             ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
     526           0 :             break;
     527         554 :         case T_AggState:
     528             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     529         554 :             ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
     530         554 :             break;
     531           6 :         case T_MemoizeState:
     532             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     533           6 :             ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
     534           6 :             break;
     535         142 :         default:
     536         142 :             break;
     537             :     }
     538             : 
     539        2960 :     return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
     540             : }
     541             : 
     542             : /*
     543             :  * It sets up the response queues for backend workers to return tuples
     544             :  * to the main backend and start the workers.
     545             :  */
     546             : static shm_mq_handle **
     547         976 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
     548             : {
     549             :     shm_mq_handle **responseq;
     550             :     char       *tqueuespace;
     551             :     int         i;
     552             : 
     553             :     /* Skip this if no workers. */
     554         976 :     if (pcxt->nworkers == 0)
     555           0 :         return NULL;
     556             : 
     557             :     /* Allocate memory for shared memory queue handles. */
     558             :     responseq = (shm_mq_handle **)
     559         976 :         palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
     560             : 
     561             :     /*
     562             :      * If not reinitializing, allocate space from the DSM for the queues;
     563             :      * otherwise, find the already allocated space.
     564             :      */
     565         976 :     if (!reinitialize)
     566             :         tqueuespace =
     567         718 :             shm_toc_allocate(pcxt->toc,
     568             :                              mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
     569         718 :                                       pcxt->nworkers));
     570             :     else
     571         258 :         tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     572             : 
     573             :     /* Create the queues, and become the receiver for each. */
     574        3586 :     for (i = 0; i < pcxt->nworkers; ++i)
     575             :     {
     576             :         shm_mq     *mq;
     577             : 
     578        2610 :         mq = shm_mq_create(tqueuespace +
     579        2610 :                            ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
     580             :                            (Size) PARALLEL_TUPLE_QUEUE_SIZE);
     581             : 
     582        2610 :         shm_mq_set_receiver(mq, MyProc);
     583        2610 :         responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
     584             :     }
     585             : 
     586             :     /* Add array of queues to shm_toc, so others can find it. */
     587         976 :     if (!reinitialize)
     588         718 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
     589             : 
     590             :     /* Return array of handles. */
     591         976 :     return responseq;
     592             : }
     593             : 
     594             : /*
     595             :  * Sets up the required infrastructure for backend workers to perform
     596             :  * execution and return results to the main backend.
     597             :  */
     598             : ParallelExecutorInfo *
     599         718 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
     600             :                      Bitmapset *sendParams, int nworkers,
     601             :                      int64 tuples_needed)
     602             : {
     603             :     ParallelExecutorInfo *pei;
     604             :     ParallelContext *pcxt;
     605             :     ExecParallelEstimateContext e;
     606             :     ExecParallelInitializeDSMContext d;
     607             :     FixedParallelExecutorState *fpes;
     608             :     char       *pstmt_data;
     609             :     char       *pstmt_space;
     610             :     char       *paramlistinfo_space;
     611             :     BufferUsage *bufusage_space;
     612             :     WalUsage   *walusage_space;
     613         718 :     SharedExecutorInstrumentation *instrumentation = NULL;
     614         718 :     SharedJitInstrumentation *jit_instrumentation = NULL;
     615             :     int         pstmt_len;
     616             :     int         paramlistinfo_len;
     617         718 :     int         instrumentation_len = 0;
     618         718 :     int         jit_instrumentation_len = 0;
     619         718 :     int         instrument_offset = 0;
     620         718 :     Size        dsa_minsize = dsa_minimum_size();
     621             :     char       *query_string;
     622             :     int         query_len;
     623             : 
     624             :     /*
     625             :      * Force any initplan outputs that we're going to pass to workers to be
     626             :      * evaluated, if they weren't already.
     627             :      *
     628             :      * For simplicity, we use the EState's per-output-tuple ExprContext here.
     629             :      * That risks intra-query memory leakage, since we might pass through here
     630             :      * many times before that ExprContext gets reset; but ExecSetParamPlan
     631             :      * doesn't normally leak any memory in the context (see its comments), so
     632             :      * it doesn't seem worth complicating this function's API to pass it a
     633             :      * shorter-lived ExprContext.  This might need to change someday.
     634             :      */
     635         718 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     636             : 
     637             :     /* Allocate object for return value. */
     638         718 :     pei = palloc0(sizeof(ParallelExecutorInfo));
     639         718 :     pei->finished = false;
     640         718 :     pei->planstate = planstate;
     641             : 
     642             :     /* Fix up and serialize plan to be sent to workers. */
     643         718 :     pstmt_data = ExecSerializePlan(planstate->plan, estate);
     644             : 
     645             :     /* Create a parallel context. */
     646         718 :     pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
     647         718 :     pei->pcxt = pcxt;
     648             : 
     649             :     /*
     650             :      * Before telling the parallel context to create a dynamic shared memory
     651             :      * segment, we need to figure out how big it should be.  Estimate space
     652             :      * for the various things we need to store.
     653             :      */
     654             : 
     655             :     /* Estimate space for fixed-size state. */
     656         718 :     shm_toc_estimate_chunk(&pcxt->estimator,
     657             :                            sizeof(FixedParallelExecutorState));
     658         718 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     659             : 
     660             :     /* Estimate space for query text. */
     661         718 :     query_len = strlen(estate->es_sourceText);
     662         718 :     shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
     663         718 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     664             : 
     665             :     /* Estimate space for serialized PlannedStmt. */
     666         718 :     pstmt_len = strlen(pstmt_data) + 1;
     667         718 :     shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
     668         718 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     669             : 
     670             :     /* Estimate space for serialized ParamListInfo. */
     671         718 :     paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
     672         718 :     shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
     673         718 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     674             : 
     675             :     /*
     676             :      * Estimate space for BufferUsage.
     677             :      *
     678             :      * If EXPLAIN is not in use and there are no extensions loaded that care,
     679             :      * we could skip this.  But we have no way of knowing whether anyone's
     680             :      * looking at pgBufferUsage, so do it unconditionally.
     681             :      */
     682         718 :     shm_toc_estimate_chunk(&pcxt->estimator,
     683             :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     684         718 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     685             : 
     686             :     /*
     687             :      * Same thing for WalUsage.
     688             :      */
     689         718 :     shm_toc_estimate_chunk(&pcxt->estimator,
     690             :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     691         718 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     692             : 
     693             :     /* Estimate space for tuple queues. */
     694         718 :     shm_toc_estimate_chunk(&pcxt->estimator,
     695             :                            mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
     696         718 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     697             : 
     698             :     /*
     699             :      * Give parallel-aware nodes a chance to add to the estimates, and get a
     700             :      * count of how many PlanState nodes there are.
     701             :      */
     702         718 :     e.pcxt = pcxt;
     703         718 :     e.nnodes = 0;
     704         718 :     ExecParallelEstimate(planstate, &e);
     705             : 
     706             :     /* Estimate space for instrumentation, if required. */
     707         718 :     if (estate->es_instrument)
     708             :     {
     709         180 :         instrumentation_len =
     710             :             offsetof(SharedExecutorInstrumentation, plan_node_id) +
     711         180 :             sizeof(int) * e.nnodes;
     712         180 :         instrumentation_len = MAXALIGN(instrumentation_len);
     713         180 :         instrument_offset = instrumentation_len;
     714         180 :         instrumentation_len +=
     715         180 :             mul_size(sizeof(Instrumentation),
     716         180 :                      mul_size(e.nnodes, nworkers));
     717         180 :         shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
     718         180 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     719             : 
     720             :         /* Estimate space for JIT instrumentation, if required. */
     721         180 :         if (estate->es_jit_flags != PGJIT_NONE)
     722             :         {
     723          24 :             jit_instrumentation_len =
     724          24 :                 offsetof(SharedJitInstrumentation, jit_instr) +
     725             :                 sizeof(JitInstrumentation) * nworkers;
     726          24 :             shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
     727          24 :             shm_toc_estimate_keys(&pcxt->estimator, 1);
     728             :         }
     729             :     }
     730             : 
     731             :     /* Estimate space for DSA area. */
     732         718 :     shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
     733         718 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     734             : 
     735             :     /*
     736             :      * InitializeParallelDSM() passes the active snapshot to the parallel
     737             :      * worker, which uses it to set es_snapshot.  Make sure we don't set
     738             :      * es_snapshot differently in the child.
     739             :      */
     740             :     Assert(GetActiveSnapshot() == estate->es_snapshot);
     741             : 
     742             :     /* Everyone's had a chance to ask for space, so now create the DSM. */
     743         718 :     InitializeParallelDSM(pcxt);
     744             : 
     745             :     /*
     746             :      * OK, now we have a dynamic shared memory segment, and it should be big
     747             :      * enough to store all of the data we estimated we would want to put into
     748             :      * it, plus whatever general stuff (not specifically executor-related) the
     749             :      * ParallelContext itself needs to store there.  None of the space we
     750             :      * asked for has been allocated or initialized yet, though, so do that.
     751             :      */
     752             : 
     753             :     /* Store fixed-size state. */
     754         718 :     fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
     755         718 :     fpes->tuples_needed = tuples_needed;
     756         718 :     fpes->param_exec = InvalidDsaPointer;
     757         718 :     fpes->eflags = estate->es_top_eflags;
     758         718 :     fpes->jit_flags = estate->es_jit_flags;
     759         718 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
     760             : 
     761             :     /* Store query string */
     762         718 :     query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
     763         718 :     memcpy(query_string, estate->es_sourceText, query_len + 1);
     764         718 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
     765             : 
     766             :     /* Store serialized PlannedStmt. */
     767         718 :     pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
     768         718 :     memcpy(pstmt_space, pstmt_data, pstmt_len);
     769         718 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
     770             : 
     771             :     /* Store serialized ParamListInfo. */
     772         718 :     paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
     773         718 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
     774         718 :     SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
     775             : 
     776             :     /* Allocate space for each worker's BufferUsage; no need to initialize. */
     777         718 :     bufusage_space = shm_toc_allocate(pcxt->toc,
     778         718 :                                       mul_size(sizeof(BufferUsage), pcxt->nworkers));
     779         718 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
     780         718 :     pei->buffer_usage = bufusage_space;
     781             : 
     782             :     /* Same for WalUsage. */
     783         718 :     walusage_space = shm_toc_allocate(pcxt->toc,
     784         718 :                                       mul_size(sizeof(WalUsage), pcxt->nworkers));
     785         718 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
     786         718 :     pei->wal_usage = walusage_space;
     787             : 
     788             :     /* Set up the tuple queues that the workers will write into. */
     789         718 :     pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     790             : 
     791             :     /* We don't need the TupleQueueReaders yet, though. */
     792         718 :     pei->reader = NULL;
     793             : 
     794             :     /*
     795             :      * If instrumentation options were supplied, allocate space for the data.
     796             :      * It only gets partially initialized here; the rest happens during
     797             :      * ExecParallelInitializeDSM.
     798             :      */
     799         718 :     if (estate->es_instrument)
     800             :     {
     801             :         Instrumentation *instrument;
     802             :         int         i;
     803             : 
     804         180 :         instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
     805         180 :         instrumentation->instrument_options = estate->es_instrument;
     806         180 :         instrumentation->instrument_offset = instrument_offset;
     807         180 :         instrumentation->num_workers = nworkers;
     808         180 :         instrumentation->num_plan_nodes = e.nnodes;
     809         180 :         instrument = GetInstrumentationArray(instrumentation);
     810        1860 :         for (i = 0; i < nworkers * e.nnodes; ++i)
     811        1680 :             InstrInit(&instrument[i], estate->es_instrument);
     812         180 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
     813             :                        instrumentation);
     814         180 :         pei->instrumentation = instrumentation;
     815             : 
     816         180 :         if (estate->es_jit_flags != PGJIT_NONE)
     817             :         {
     818          24 :             jit_instrumentation = shm_toc_allocate(pcxt->toc,
     819             :                                                    jit_instrumentation_len);
     820          24 :             jit_instrumentation->num_workers = nworkers;
     821          24 :             memset(jit_instrumentation->jit_instr, 0,
     822             :                    sizeof(JitInstrumentation) * nworkers);
     823          24 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
     824             :                            jit_instrumentation);
     825          24 :             pei->jit_instrumentation = jit_instrumentation;
     826             :         }
     827             :     }
     828             : 
     829             :     /*
     830             :      * Create a DSA area that can be used by the leader and all workers.
     831             :      * (However, if we failed to create a DSM and are using private memory
     832             :      * instead, then skip this.)
     833             :      */
     834         718 :     if (pcxt->seg != NULL)
     835             :     {
     836             :         char       *area_space;
     837             : 
     838         718 :         area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
     839         718 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
     840         718 :         pei->area = dsa_create_in_place(area_space, dsa_minsize,
     841             :                                         LWTRANCHE_PARALLEL_QUERY_DSA,
     842             :                                         pcxt->seg);
     843             : 
     844             :         /*
     845             :          * Serialize parameters, if any, using DSA storage.  We don't dare use
     846             :          * the main parallel query DSM for this because we might relaunch
     847             :          * workers after the values have changed (and thus the amount of
     848             :          * storage required has changed).
     849             :          */
     850         718 :         if (!bms_is_empty(sendParams))
     851             :         {
     852          24 :             pei->param_exec = SerializeParamExecParams(estate, sendParams,
     853             :                                                        pei->area);
     854          24 :             fpes->param_exec = pei->param_exec;
     855             :         }
     856             :     }
     857             : 
     858             :     /*
     859             :      * Give parallel-aware nodes a chance to initialize their shared data.
     860             :      * This also initializes the elements of instrumentation->ps_instrument,
     861             :      * if it exists.
     862             :      */
     863         718 :     d.pcxt = pcxt;
     864         718 :     d.instrumentation = instrumentation;
     865         718 :     d.nnodes = 0;
     866             : 
     867             :     /* Install our DSA area while initializing the plan. */
     868         718 :     estate->es_query_dsa = pei->area;
     869         718 :     ExecParallelInitializeDSM(planstate, &d);
     870         718 :     estate->es_query_dsa = NULL;
     871             : 
     872             :     /*
     873             :      * Make sure that the world hasn't shifted under our feet.  This could
     874             :      * probably just be an Assert(), but let's be conservative for now.
     875             :      */
     876         718 :     if (e.nnodes != d.nnodes)
     877           0 :         elog(ERROR, "inconsistent count of PlanState nodes");
     878             : 
     879             :     /* OK, we're ready to rock and roll. */
     880         718 :     return pei;
     881             : }
     882             : 
     883             : /*
     884             :  * Set up tuple queue readers to read the results of a parallel subplan.
     885             :  *
     886             :  * This is separate from ExecInitParallelPlan() because we can launch the
     887             :  * worker processes and let them start doing something before we do this.
     888             :  */
     889             : void
     890         958 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
     891             : {
     892         958 :     int         nworkers = pei->pcxt->nworkers_launched;
     893             :     int         i;
     894             : 
     895             :     Assert(pei->reader == NULL);
     896             : 
     897         958 :     if (nworkers > 0)
     898             :     {
     899         958 :         pei->reader = (TupleQueueReader **)
     900         958 :             palloc(nworkers * sizeof(TupleQueueReader *));
     901             : 
     902        3486 :         for (i = 0; i < nworkers; i++)
     903             :         {
     904        2528 :             shm_mq_set_handle(pei->tqueue[i],
     905        2528 :                               pei->pcxt->worker[i].bgwhandle);
     906        2528 :             pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
     907             :         }
     908             :     }
     909         958 : }
     910             : 
     911             : /*
     912             :  * Re-initialize the parallel executor shared memory state before launching
     913             :  * a fresh batch of workers.
     914             :  */
     915             : void
     916         258 : ExecParallelReinitialize(PlanState *planstate,
     917             :                          ParallelExecutorInfo *pei,
     918             :                          Bitmapset *sendParams)
     919             : {
     920         258 :     EState     *estate = planstate->state;
     921             :     FixedParallelExecutorState *fpes;
     922             : 
     923             :     /* Old workers must already be shut down */
     924             :     Assert(pei->finished);
     925             : 
     926             :     /*
     927             :      * Force any initplan outputs that we're going to pass to workers to be
     928             :      * evaluated, if they weren't already (see comments in
     929             :      * ExecInitParallelPlan).
     930             :      */
     931         258 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     932             : 
     933         258 :     ReinitializeParallelDSM(pei->pcxt);
     934         258 :     pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
     935         258 :     pei->reader = NULL;
     936         258 :     pei->finished = false;
     937             : 
     938         258 :     fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
     939             : 
     940             :     /* Free any serialized parameters from the last round. */
     941         258 :     if (DsaPointerIsValid(fpes->param_exec))
     942             :     {
     943           0 :         dsa_free(pei->area, fpes->param_exec);
     944           0 :         fpes->param_exec = InvalidDsaPointer;
     945             :     }
     946             : 
     947             :     /* Serialize current parameter values if required. */
     948         258 :     if (!bms_is_empty(sendParams))
     949             :     {
     950           0 :         pei->param_exec = SerializeParamExecParams(estate, sendParams,
     951             :                                                    pei->area);
     952           0 :         fpes->param_exec = pei->param_exec;
     953             :     }
     954             : 
     955             :     /* Traverse plan tree and let each child node reset associated state. */
     956         258 :     estate->es_query_dsa = pei->area;
     957         258 :     ExecParallelReInitializeDSM(planstate, pei->pcxt);
     958         258 :     estate->es_query_dsa = NULL;
     959         258 : }
     960             : 
     961             : /*
     962             :  * Traverse plan tree to reinitialize per-node dynamic shared memory state
     963             :  */
     964             : static bool
     965         666 : ExecParallelReInitializeDSM(PlanState *planstate,
     966             :                             ParallelContext *pcxt)
     967             : {
     968         666 :     if (planstate == NULL)
     969           0 :         return false;
     970             : 
     971             :     /*
     972             :      * Call reinitializers for DSM-using plan nodes.
     973             :      */
     974         666 :     switch (nodeTag(planstate))
     975             :     {
     976         276 :         case T_SeqScanState:
     977         276 :             if (planstate->plan->parallel_aware)
     978         228 :                 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
     979             :                                            pcxt);
     980         276 :             break;
     981          12 :         case T_IndexScanState:
     982          12 :             if (planstate->plan->parallel_aware)
     983          12 :                 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
     984             :                                              pcxt);
     985          12 :             break;
     986          12 :         case T_IndexOnlyScanState:
     987          12 :             if (planstate->plan->parallel_aware)
     988          12 :                 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
     989             :                                                  pcxt);
     990          12 :             break;
     991           0 :         case T_ForeignScanState:
     992           0 :             if (planstate->plan->parallel_aware)
     993           0 :                 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
     994             :                                                pcxt);
     995           0 :             break;
     996           0 :         case T_AppendState:
     997           0 :             if (planstate->plan->parallel_aware)
     998           0 :                 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
     999           0 :             break;
    1000           0 :         case T_CustomScanState:
    1001           0 :             if (planstate->plan->parallel_aware)
    1002           0 :                 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
    1003             :                                               pcxt);
    1004           0 :             break;
    1005          54 :         case T_BitmapHeapScanState:
    1006          54 :             if (planstate->plan->parallel_aware)
    1007          54 :                 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
    1008             :                                               pcxt);
    1009          54 :             break;
    1010          96 :         case T_HashJoinState:
    1011          96 :             if (planstate->plan->parallel_aware)
    1012          48 :                 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
    1013             :                                             pcxt);
    1014          96 :             break;
    1015         180 :         case T_BitmapIndexScanState:
    1016             :         case T_HashState:
    1017             :         case T_SortState:
    1018             :         case T_IncrementalSortState:
    1019             :         case T_MemoizeState:
    1020             :             /* these nodes have DSM state, but no reinitialization is required */
    1021         180 :             break;
    1022             : 
    1023          36 :         default:
    1024          36 :             break;
    1025             :     }
    1026             : 
    1027         666 :     return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
    1028             : }
    1029             : 
    1030             : /*
    1031             :  * Copy instrumentation information about this node and its descendants from
    1032             :  * dynamic shared memory.
    1033             :  */
    1034             : static bool
    1035        1026 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
    1036             :                                     SharedExecutorInstrumentation *instrumentation)
    1037             : {
    1038             :     Instrumentation *instrument;
    1039             :     int         i;
    1040             :     int         n;
    1041             :     int         ibytes;
    1042        1026 :     int         plan_node_id = planstate->plan->plan_node_id;
    1043             :     MemoryContext oldcontext;
    1044             : 
    1045             :     /* Find the instrumentation for this node. */
    1046        4638 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1047        4638 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1048        1026 :             break;
    1049        1026 :     if (i >= instrumentation->num_plan_nodes)
    1050           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1051             : 
    1052             :     /* Accumulate the statistics from all workers. */
    1053        1026 :     instrument = GetInstrumentationArray(instrumentation);
    1054        1026 :     instrument += i * instrumentation->num_workers;
    1055        2706 :     for (n = 0; n < instrumentation->num_workers; ++n)
    1056        1680 :         InstrAggNode(planstate->instrument, &instrument[n]);
    1057             : 
    1058             :     /*
    1059             :      * Also store the per-worker detail.
    1060             :      *
    1061             :      * Worker instrumentation should be allocated in the same context as the
    1062             :      * regular instrumentation information, which is the per-query context.
    1063             :      * Switch into per-query memory context.
    1064             :      */
    1065        1026 :     oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
    1066        1026 :     ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
    1067        1026 :     planstate->worker_instrument =
    1068        1026 :         palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
    1069        1026 :     MemoryContextSwitchTo(oldcontext);
    1070             : 
    1071        1026 :     planstate->worker_instrument->num_workers = instrumentation->num_workers;
    1072        1026 :     memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
    1073             : 
    1074             :     /* Perform any node-type-specific work that needs to be done. */
    1075        1026 :     switch (nodeTag(planstate))
    1076             :     {
    1077         270 :         case T_IndexScanState:
    1078         270 :             ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
    1079         270 :             break;
    1080           0 :         case T_IndexOnlyScanState:
    1081           0 :             ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
    1082           0 :             break;
    1083           0 :         case T_BitmapIndexScanState:
    1084           0 :             ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
    1085           0 :             break;
    1086          12 :         case T_SortState:
    1087          12 :             ExecSortRetrieveInstrumentation((SortState *) planstate);
    1088          12 :             break;
    1089           0 :         case T_IncrementalSortState:
    1090           0 :             ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
    1091           0 :             break;
    1092          84 :         case T_HashState:
    1093          84 :             ExecHashRetrieveInstrumentation((HashState *) planstate);
    1094          84 :             break;
    1095         102 :         case T_AggState:
    1096         102 :             ExecAggRetrieveInstrumentation((AggState *) planstate);
    1097         102 :             break;
    1098           0 :         case T_MemoizeState:
    1099           0 :             ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
    1100           0 :             break;
    1101           0 :         case T_BitmapHeapScanState:
    1102           0 :             ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
    1103           0 :             break;
    1104         558 :         default:
    1105         558 :             break;
    1106             :     }
    1107             : 
    1108        1026 :     return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
    1109             :                                  instrumentation);
    1110             : }
    1111             : 
    1112             : /*
    1113             :  * Add up the workers' JIT instrumentation from dynamic shared memory.
    1114             :  */
    1115             : static void
    1116          24 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
    1117             :                                        SharedJitInstrumentation *shared_jit)
    1118             : {
    1119             :     JitInstrumentation *combined;
    1120             :     int         ibytes;
    1121             : 
    1122             :     int         n;
    1123             : 
    1124             :     /*
    1125             :      * Accumulate worker JIT instrumentation into the combined JIT
    1126             :      * instrumentation, allocating it if required.
    1127             :      */
    1128          24 :     if (!planstate->state->es_jit_worker_instr)
    1129          24 :         planstate->state->es_jit_worker_instr =
    1130          24 :             MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
    1131          24 :     combined = planstate->state->es_jit_worker_instr;
    1132             : 
    1133             :     /* Accumulate all the workers' instrumentations. */
    1134          72 :     for (n = 0; n < shared_jit->num_workers; ++n)
    1135          48 :         InstrJitAgg(combined, &shared_jit->jit_instr[n]);
    1136             : 
    1137             :     /*
    1138             :      * Store the per-worker detail.
    1139             :      *
    1140             :      * Similar to ExecParallelRetrieveInstrumentation(), allocate the
    1141             :      * instrumentation in per-query context.
    1142             :      */
    1143          24 :     ibytes = offsetof(SharedJitInstrumentation, jit_instr)
    1144          24 :         + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
    1145          24 :     planstate->worker_jit_instrument =
    1146          24 :         MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
    1147             : 
    1148          24 :     memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
    1149          24 : }
    1150             : 
    1151             : /*
    1152             :  * Finish parallel execution.  We wait for parallel workers to finish, and
    1153             :  * accumulate their buffer/WAL usage.
    1154             :  */
    1155             : void
    1156        1760 : ExecParallelFinish(ParallelExecutorInfo *pei)
    1157             : {
    1158        1760 :     int         nworkers = pei->pcxt->nworkers_launched;
    1159             :     int         i;
    1160             : 
    1161             :     /* Make this be a no-op if called twice in a row. */
    1162        1760 :     if (pei->finished)
    1163         796 :         return;
    1164             : 
    1165             :     /*
    1166             :      * Detach from tuple queues ASAP, so that any still-active workers will
    1167             :      * notice that no further results are wanted.
    1168             :      */
    1169         964 :     if (pei->tqueue != NULL)
    1170             :     {
    1171        3480 :         for (i = 0; i < nworkers; i++)
    1172        2516 :             shm_mq_detach(pei->tqueue[i]);
    1173         964 :         pfree(pei->tqueue);
    1174         964 :         pei->tqueue = NULL;
    1175             :     }
    1176             : 
    1177             :     /*
    1178             :      * While we're waiting for the workers to finish, let's get rid of the
    1179             :      * tuple queue readers.  (Any other local cleanup could be done here too.)
    1180             :      */
    1181         964 :     if (pei->reader != NULL)
    1182             :     {
    1183        3462 :         for (i = 0; i < nworkers; i++)
    1184        2516 :             DestroyTupleQueueReader(pei->reader[i]);
    1185         946 :         pfree(pei->reader);
    1186         946 :         pei->reader = NULL;
    1187             :     }
    1188             : 
    1189             :     /* Now wait for the workers to finish. */
    1190         964 :     WaitForParallelWorkersToFinish(pei->pcxt);
    1191             : 
    1192             :     /*
    1193             :      * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
    1194             :      * finish, or we might get incomplete data.)
    1195             :      */
    1196        3480 :     for (i = 0; i < nworkers; i++)
    1197        2516 :         InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
    1198             : 
    1199         964 :     pei->finished = true;
    1200             : }
    1201             : 
    1202             : /*
    1203             :  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
    1204             :  * resources still exist after ExecParallelFinish.  We separate these
    1205             :  * routines because someone might want to examine the contents of the DSM
    1206             :  * after ExecParallelFinish and before calling this routine.
    1207             :  */
    1208             : void
    1209         706 : ExecParallelCleanup(ParallelExecutorInfo *pei)
    1210             : {
    1211             :     /* Accumulate instrumentation, if any. */
    1212         706 :     if (pei->instrumentation)
    1213         180 :         ExecParallelRetrieveInstrumentation(pei->planstate,
    1214             :                                             pei->instrumentation);
    1215             : 
    1216             :     /* Accumulate JIT instrumentation, if any. */
    1217         706 :     if (pei->jit_instrumentation)
    1218          24 :         ExecParallelRetrieveJitInstrumentation(pei->planstate,
    1219          24 :                                                pei->jit_instrumentation);
    1220             : 
    1221             :     /* Free any serialized parameters. */
    1222         706 :     if (DsaPointerIsValid(pei->param_exec))
    1223             :     {
    1224          24 :         dsa_free(pei->area, pei->param_exec);
    1225          24 :         pei->param_exec = InvalidDsaPointer;
    1226             :     }
    1227         706 :     if (pei->area != NULL)
    1228             :     {
    1229         706 :         dsa_detach(pei->area);
    1230         706 :         pei->area = NULL;
    1231             :     }
    1232         706 :     if (pei->pcxt != NULL)
    1233             :     {
    1234         706 :         DestroyParallelContext(pei->pcxt);
    1235         706 :         pei->pcxt = NULL;
    1236             :     }
    1237         706 :     pfree(pei);
    1238         706 : }
    1239             : 
    1240             : /*
    1241             :  * Create a DestReceiver to write tuples we produce to the shm_mq designated
    1242             :  * for that purpose.
    1243             :  */
    1244             : static DestReceiver *
    1245        2528 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
    1246             : {
    1247             :     char       *mqspace;
    1248             :     shm_mq     *mq;
    1249             : 
    1250        2528 :     mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
    1251        2528 :     mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
    1252        2528 :     mq = (shm_mq *) mqspace;
    1253        2528 :     shm_mq_set_sender(mq, MyProc);
    1254        2528 :     return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
    1255             : }
    1256             : 
    1257             : /*
    1258             :  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
    1259             :  */
    1260             : static QueryDesc *
    1261        2528 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
    1262             :                          int instrument_options)
    1263             : {
    1264             :     char       *pstmtspace;
    1265             :     char       *paramspace;
    1266             :     PlannedStmt *pstmt;
    1267             :     ParamListInfo paramLI;
    1268             :     char       *queryString;
    1269             : 
    1270             :     /* Get the query string from shared memory */
    1271        2528 :     queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
    1272             : 
    1273             :     /* Reconstruct leader-supplied PlannedStmt. */
    1274        2528 :     pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
    1275        2528 :     pstmt = (PlannedStmt *) stringToNode(pstmtspace);
    1276             : 
    1277             :     /* Reconstruct ParamListInfo. */
    1278        2528 :     paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
    1279        2528 :     paramLI = RestoreParamList(&paramspace);
    1280             : 
    1281             :     /*
    1282             :      * Create a QueryDesc for the query.  We pass NULL for cachedplan, because
    1283             :      * we don't have a pointer to the CachedPlan in the leader's process. It's
    1284             :      * fine because the only reason the executor needs to see it is to decide
    1285             :      * if it should take locks on certain relations, but parallel workers
    1286             :      * always take locks anyway.
    1287             :      */
    1288        2528 :     return CreateQueryDesc(pstmt,
    1289             :                            NULL,
    1290             :                            queryString,
    1291             :                            GetActiveSnapshot(), InvalidSnapshot,
    1292             :                            receiver, paramLI, NULL, instrument_options);
    1293             : }
    1294             : 
    1295             : /*
    1296             :  * Copy instrumentation information from this node and its descendants into
    1297             :  * dynamic shared memory, so that the parallel leader can retrieve it.
    1298             :  */
    1299             : static bool
    1300        2366 : ExecParallelReportInstrumentation(PlanState *planstate,
    1301             :                                   SharedExecutorInstrumentation *instrumentation)
    1302             : {
    1303             :     int         i;
    1304        2366 :     int         plan_node_id = planstate->plan->plan_node_id;
    1305             :     Instrumentation *instrument;
    1306             : 
    1307        2366 :     InstrEndLoop(planstate->instrument);
    1308             : 
    1309             :     /*
    1310             :      * If we shuffled the plan_node_id values in ps_instrument into sorted
    1311             :      * order, we could use binary search here.  This might matter someday if
    1312             :      * we're pushing down sufficiently large plan trees.  For now, do it the
    1313             :      * slow, dumb way.
    1314             :      */
    1315        7782 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1316        7782 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1317        2366 :             break;
    1318        2366 :     if (i >= instrumentation->num_plan_nodes)
    1319           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1320             : 
    1321             :     /*
    1322             :      * Add our statistics to the per-node, per-worker totals.  It's possible
    1323             :      * that this could happen more than once if we relaunched workers.
    1324             :      */
    1325        2366 :     instrument = GetInstrumentationArray(instrumentation);
    1326        2366 :     instrument += i * instrumentation->num_workers;
    1327             :     Assert(IsParallelWorker());
    1328             :     Assert(ParallelWorkerNumber < instrumentation->num_workers);
    1329        2366 :     InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
    1330             : 
    1331        2366 :     return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
    1332             :                                  instrumentation);
    1333             : }
    1334             : 
    1335             : /*
    1336             :  * Initialize the PlanState and its descendants with the information
    1337             :  * retrieved from shared memory.  This has to be done once the PlanState
    1338             :  * is allocated and initialized by executor; that is, after ExecutorStart().
    1339             :  */
    1340             : static bool
    1341        8150 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
    1342             : {
    1343        8150 :     if (planstate == NULL)
    1344           0 :         return false;
    1345             : 
    1346        8150 :     switch (nodeTag(planstate))
    1347             :     {
    1348        3312 :         case T_SeqScanState:
    1349        3312 :             if (planstate->plan->parallel_aware)
    1350        2684 :                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
    1351        3312 :             break;
    1352         396 :         case T_IndexScanState:
    1353             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1354         396 :             ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
    1355         396 :             break;
    1356         242 :         case T_IndexOnlyScanState:
    1357             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1358         242 :             ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
    1359             :                                               pwcxt);
    1360         242 :             break;
    1361         272 :         case T_BitmapIndexScanState:
    1362             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1363         272 :             ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
    1364             :                                                 pwcxt);
    1365         272 :             break;
    1366           0 :         case T_ForeignScanState:
    1367           0 :             if (planstate->plan->parallel_aware)
    1368           0 :                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
    1369             :                                                 pwcxt);
    1370           0 :             break;
    1371         376 :         case T_AppendState:
    1372         376 :             if (planstate->plan->parallel_aware)
    1373         316 :                 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
    1374         376 :             break;
    1375           0 :         case T_CustomScanState:
    1376           0 :             if (planstate->plan->parallel_aware)
    1377           0 :                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
    1378             :                                                pwcxt);
    1379           0 :             break;
    1380         272 :         case T_BitmapHeapScanState:
    1381         272 :             if (planstate->plan->parallel_aware)
    1382         270 :                 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
    1383             :                                                pwcxt);
    1384         272 :             break;
    1385         546 :         case T_HashJoinState:
    1386         546 :             if (planstate->plan->parallel_aware)
    1387         306 :                 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
    1388             :                                              pwcxt);
    1389         546 :             break;
    1390         546 :         case T_HashState:
    1391             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1392         546 :             ExecHashInitializeWorker((HashState *) planstate, pwcxt);
    1393         546 :             break;
    1394         452 :         case T_SortState:
    1395             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1396         452 :             ExecSortInitializeWorker((SortState *) planstate, pwcxt);
    1397         452 :             break;
    1398           0 :         case T_IncrementalSortState:
    1399             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1400           0 :             ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
    1401             :                                                 pwcxt);
    1402           0 :             break;
    1403        1546 :         case T_AggState:
    1404             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1405        1546 :             ExecAggInitializeWorker((AggState *) planstate, pwcxt);
    1406        1546 :             break;
    1407          12 :         case T_MemoizeState:
    1408             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1409          12 :             ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
    1410          12 :             break;
    1411         178 :         default:
    1412         178 :             break;
    1413             :     }
    1414             : 
    1415        8150 :     return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
    1416             :                                  pwcxt);
    1417             : }
    1418             : 
    1419             : /*
    1420             :  * Main entrypoint for parallel query worker processes.
    1421             :  *
    1422             :  * We reach this function from ParallelWorkerMain, so the setup necessary to
    1423             :  * create a sensible parallel environment has already been done;
    1424             :  * ParallelWorkerMain worries about stuff like the transaction state, combo
    1425             :  * CID mappings, and GUC values, so we don't need to deal with any of that
    1426             :  * here.
    1427             :  *
    1428             :  * Our job is to deal with concerns specific to the executor.  The parallel
    1429             :  * group leader will have stored a serialized PlannedStmt, and it's our job
    1430             :  * to execute that plan and write the resulting tuples to the appropriate
    1431             :  * tuple queue.  Various bits of supporting information that we need in order
    1432             :  * to do this are also stored in the dsm_segment and can be accessed through
    1433             :  * the shm_toc.
    1434             :  */
    1435             : void
    1436        2528 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    1437             : {
    1438             :     FixedParallelExecutorState *fpes;
    1439             :     BufferUsage *buffer_usage;
    1440             :     WalUsage   *wal_usage;
    1441             :     DestReceiver *receiver;
    1442             :     QueryDesc  *queryDesc;
    1443             :     SharedExecutorInstrumentation *instrumentation;
    1444             :     SharedJitInstrumentation *jit_instrumentation;
    1445        2528 :     int         instrument_options = 0;
    1446             :     void       *area_space;
    1447             :     dsa_area   *area;
    1448             :     ParallelWorkerContext pwcxt;
    1449             : 
    1450             :     /* Get fixed-size state. */
    1451        2528 :     fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
    1452             : 
    1453             :     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    1454        2528 :     receiver = ExecParallelGetReceiver(seg, toc);
    1455        2528 :     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
    1456        2528 :     if (instrumentation != NULL)
    1457         724 :         instrument_options = instrumentation->instrument_options;
    1458        2528 :     jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
    1459             :                                          true);
    1460        2528 :     queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
    1461             : 
    1462             :     /* Setting debug_query_string for individual workers */
    1463        2528 :     debug_query_string = queryDesc->sourceText;
    1464             : 
    1465             :     /* Report workers' query for monitoring purposes */
    1466        2528 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1467             : 
    1468             :     /* Attach to the dynamic shared memory area. */
    1469        2528 :     area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
    1470        2528 :     area = dsa_attach_in_place(area_space, seg);
    1471             : 
    1472             :     /* Start up the executor */
    1473        2528 :     queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
    1474        2528 :     if (!ExecutorStart(queryDesc, fpes->eflags))
    1475           0 :         elog(ERROR, "ExecutorStart() failed unexpectedly");
    1476             : 
    1477             :     /* Special executor initialization steps for parallel workers */
    1478        2528 :     queryDesc->planstate->state->es_query_dsa = area;
    1479        2528 :     if (DsaPointerIsValid(fpes->param_exec))
    1480             :     {
    1481             :         char       *paramexec_space;
    1482             : 
    1483          72 :         paramexec_space = dsa_get_address(area, fpes->param_exec);
    1484          72 :         RestoreParamExecParams(paramexec_space, queryDesc->estate);
    1485             :     }
    1486        2528 :     pwcxt.toc = toc;
    1487        2528 :     pwcxt.seg = seg;
    1488        2528 :     ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
    1489             : 
    1490             :     /* Pass down any tuple bound */
    1491        2528 :     ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
    1492             : 
    1493             :     /*
    1494             :      * Prepare to track buffer/WAL usage during query execution.
    1495             :      *
    1496             :      * We do this after starting up the executor to match what happens in the
    1497             :      * leader, which also doesn't count buffer accesses and WAL activity that
    1498             :      * occur during executor startup.
    1499             :      */
    1500        2528 :     InstrStartParallelQuery();
    1501             : 
    1502             :     /*
    1503             :      * Run the plan.  If we specified a tuple bound, be careful not to demand
    1504             :      * more tuples than that.
    1505             :      */
    1506        2528 :     ExecutorRun(queryDesc,
    1507             :                 ForwardScanDirection,
    1508        2528 :                 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
    1509             : 
    1510             :     /* Shut down the executor */
    1511        2516 :     ExecutorFinish(queryDesc);
    1512             : 
    1513             :     /* Report buffer/WAL usage during parallel execution. */
    1514        2516 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
    1515        2516 :     wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
    1516        2516 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1517        2516 :                           &wal_usage[ParallelWorkerNumber]);
    1518             : 
    1519             :     /* Report instrumentation data if any instrumentation options are set. */
    1520        2516 :     if (instrumentation != NULL)
    1521         724 :         ExecParallelReportInstrumentation(queryDesc->planstate,
    1522             :                                           instrumentation);
    1523             : 
    1524             :     /* Report JIT instrumentation data if any */
    1525        2516 :     if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
    1526             :     {
    1527             :         Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
    1528         144 :         jit_instrumentation->jit_instr[ParallelWorkerNumber] =
    1529         144 :             queryDesc->estate->es_jit->instr;
    1530             :     }
    1531             : 
    1532             :     /* Must do this after capturing instrumentation. */
    1533        2516 :     ExecutorEnd(queryDesc);
    1534             : 
    1535             :     /* Cleanup. */
    1536        2516 :     dsa_detach(area);
    1537        2516 :     FreeQueryDesc(queryDesc);
    1538        2516 :     receiver->rDestroy(receiver);
    1539        2516 : }

Generated by: LCOV version 1.14