LCOV - code coverage report
Current view: top level - src/backend/executor - execParallel.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.3 % 653 544
Test Date: 2026-05-11 16:16:35 Functions: 95.0 % 20 19
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * execParallel.c
       4              :  *    Support routines for parallel execution.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * This file contains routines that are intended to support setting up,
      10              :  * using, and tearing down a ParallelContext from within the PostgreSQL
      11              :  * executor.  The ParallelContext machinery will handle starting the
      12              :  * workers and ensuring that their state generally matches that of the
      13              :  * leader; see src/backend/access/transam/README.parallel for details.
      14              :  * However, we must save and restore relevant executor state, such as
      15              :  * any ParamListInfo associated with the query, buffer/WAL usage info, and
      16              :  * the actual plan to be passed down to the worker.
      17              :  *
      18              :  * IDENTIFICATION
      19              :  *    src/backend/executor/execParallel.c
      20              :  *
      21              :  *-------------------------------------------------------------------------
      22              :  */
      23              : 
      24              : #include "postgres.h"
      25              : 
      26              : #include "executor/execParallel.h"
      27              : #include "executor/executor.h"
      28              : #include "executor/nodeAgg.h"
      29              : #include "executor/nodeAppend.h"
      30              : #include "executor/nodeBitmapHeapscan.h"
      31              : #include "executor/nodeBitmapIndexscan.h"
      32              : #include "executor/nodeCustom.h"
      33              : #include "executor/nodeForeignscan.h"
      34              : #include "executor/nodeHash.h"
      35              : #include "executor/nodeHashjoin.h"
      36              : #include "executor/nodeIncrementalSort.h"
      37              : #include "executor/nodeIndexonlyscan.h"
      38              : #include "executor/nodeIndexscan.h"
      39              : #include "executor/nodeMemoize.h"
      40              : #include "executor/nodeSeqscan.h"
      41              : #include "executor/nodeSort.h"
      42              : #include "executor/nodeSubplan.h"
      43              : #include "executor/nodeTidrangescan.h"
      44              : #include "executor/tqueue.h"
      45              : #include "jit/jit.h"
      46              : #include "nodes/nodeFuncs.h"
      47              : #include "pgstat.h"
      48              : #include "storage/proc.h"
      49              : #include "tcop/tcopprot.h"
      50              : #include "utils/datum.h"
      51              : #include "utils/dsa.h"
      52              : #include "utils/lsyscache.h"
      53              : #include "utils/snapmgr.h"
      54              : 
      55              : /*
      56              :  * Magic numbers for parallel executor communication.  We use constants
      57              :  * greater than any 32-bit integer here so that values < 2^32 can be used
      58              :  * by individual parallel nodes to store their own state.
      59              :  */
      60              : #define PARALLEL_KEY_EXECUTOR_FIXED     UINT64CONST(0xE000000000000001)
      61              : #define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
      62              : #define PARALLEL_KEY_PARAMLISTINFO      UINT64CONST(0xE000000000000003)
      63              : #define PARALLEL_KEY_BUFFER_USAGE       UINT64CONST(0xE000000000000004)
      64              : #define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
      65              : #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
      66              : #define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
      67              : #define PARALLEL_KEY_QUERY_TEXT     UINT64CONST(0xE000000000000008)
      68              : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
      69              : #define PARALLEL_KEY_WAL_USAGE          UINT64CONST(0xE00000000000000A)
      70              : 
      71              : #define PARALLEL_TUPLE_QUEUE_SIZE       65536
      72              : 
      73              : /*
      74              :  * Fixed-size random stuff that we need to pass to parallel workers.
      75              :  */
      76              : typedef struct FixedParallelExecutorState
      77              : {
      78              :     int64       tuples_needed;  /* tuple bound, see ExecSetTupleBound */
      79              :     dsa_pointer param_exec;
      80              :     int         eflags;
      81              :     int         jit_flags;
      82              : } FixedParallelExecutorState;
      83              : 
      84              : /*
      85              :  * DSM structure for accumulating per-PlanState instrumentation.
      86              :  *
      87              :  * instrument_options: Same meaning here as in instrument.c.
      88              :  *
      89              :  * instrument_offset: Offset, relative to the start of this structure,
      90              :  * of the first NodeInstrumentation object.  This will depend on the length of
      91              :  * the plan_node_id array.
      92              :  *
      93              :  * num_workers: Number of workers.
      94              :  *
      95              :  * num_plan_nodes: Number of plan nodes.
      96              :  *
      97              :  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
      98              :  * from parallel workers.  The length of this array is given by num_plan_nodes.
      99              :  */
     100              : struct SharedExecutorInstrumentation
     101              : {
     102              :     int         instrument_options;
     103              :     int         instrument_offset;
     104              :     int         num_workers;
     105              :     int         num_plan_nodes;
     106              :     int         plan_node_id[FLEXIBLE_ARRAY_MEMBER];
     107              : 
     108              :     /*
     109              :      * Array of num_plan_nodes * num_workers NodeInstrumentation objects
     110              :      * follows.
     111              :      */
     112              : };
     113              : #define GetInstrumentationArray(sei) \
     114              :     (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
     115              :      (NodeInstrumentation *) (((char *) sei) + sei->instrument_offset))
     116              : 
     117              : /* Context object for ExecParallelEstimate. */
     118              : typedef struct ExecParallelEstimateContext
     119              : {
     120              :     ParallelContext *pcxt;
     121              :     int         nnodes;
     122              : } ExecParallelEstimateContext;
     123              : 
     124              : /* Context object for ExecParallelInitializeDSM. */
     125              : typedef struct ExecParallelInitializeDSMContext
     126              : {
     127              :     ParallelContext *pcxt;
     128              :     SharedExecutorInstrumentation *instrumentation;
     129              :     int         nnodes;
     130              : } ExecParallelInitializeDSMContext;
     131              : 
     132              : /* Helper functions that run in the parallel leader. */
     133              : static char *ExecSerializePlan(Plan *plan, EState *estate);
     134              : static bool ExecParallelEstimate(PlanState *planstate,
     135              :                                  ExecParallelEstimateContext *e);
     136              : static bool ExecParallelInitializeDSM(PlanState *planstate,
     137              :                                       ExecParallelInitializeDSMContext *d);
     138              : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
     139              :                                                     bool reinitialize);
     140              : static bool ExecParallelReInitializeDSM(PlanState *planstate,
     141              :                                         ParallelContext *pcxt);
     142              : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     143              :                                                 SharedExecutorInstrumentation *instrumentation);
     144              : 
     145              : /* Helper function that runs in the parallel worker. */
     146              : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
     147              : 
     148              : /*
     149              :  * Create a serialized representation of the plan to be sent to each worker.
     150              :  */
     151              : static char *
     152          523 : ExecSerializePlan(Plan *plan, EState *estate)
     153              : {
     154              :     PlannedStmt *pstmt;
     155              :     ListCell   *lc;
     156              : 
     157              :     /* We can't scribble on the original plan, so make a copy. */
     158          523 :     plan = copyObject(plan);
     159              : 
     160              :     /*
     161              :      * The worker will start its own copy of the executor, and that copy will
     162              :      * insert a junk filter if the toplevel node has any resjunk entries. We
     163              :      * don't want that to happen, because while resjunk columns shouldn't be
     164              :      * sent back to the user, here the tuples are coming back to another
     165              :      * backend which may very well need them.  So mutate the target list
     166              :      * accordingly.  This is sort of a hack; there might be better ways to do
     167              :      * this...
     168              :      */
     169         1439 :     foreach(lc, plan->targetlist)
     170              :     {
     171          916 :         TargetEntry *tle = lfirst_node(TargetEntry, lc);
     172              : 
     173          916 :         tle->resjunk = false;
     174              :     }
     175              : 
     176              :     /*
     177              :      * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
     178              :      * for our purposes, but the worker will need at least a minimal
     179              :      * PlannedStmt to start the executor.
     180              :      */
     181          523 :     pstmt = makeNode(PlannedStmt);
     182          523 :     pstmt->commandType = CMD_SELECT;
     183          523 :     pstmt->queryId = pgstat_get_my_query_id();
     184          523 :     pstmt->planId = pgstat_get_my_plan_id();
     185          523 :     pstmt->hasReturning = false;
     186          523 :     pstmt->hasModifyingCTE = false;
     187          523 :     pstmt->canSetTag = true;
     188          523 :     pstmt->transientPlan = false;
     189          523 :     pstmt->dependsOnRole = false;
     190          523 :     pstmt->parallelModeNeeded = false;
     191          523 :     pstmt->planTree = plan;
     192          523 :     pstmt->partPruneInfos = estate->es_part_prune_infos;
     193          523 :     pstmt->rtable = estate->es_range_table;
     194          523 :     pstmt->unprunableRelids = estate->es_unpruned_relids;
     195          523 :     pstmt->permInfos = estate->es_rteperminfos;
     196          523 :     pstmt->appendRelations = NIL;
     197          523 :     pstmt->planOrigin = PLAN_STMT_INTERNAL;
     198              : 
     199              :     /*
     200              :      * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
     201              :      * for unsafe ones (so that the list indexes of the safe ones are
     202              :      * preserved).  This positively ensures that the worker won't try to run,
     203              :      * or even do ExecInitNode on, an unsafe subplan.  That's important to
     204              :      * protect, eg, non-parallel-aware FDWs from getting into trouble.
     205              :      */
     206          523 :     pstmt->subplans = NIL;
     207          559 :     foreach(lc, estate->es_plannedstmt->subplans)
     208              :     {
     209           36 :         Plan       *subplan = (Plan *) lfirst(lc);
     210              : 
     211           36 :         if (subplan && !subplan->parallel_safe)
     212            8 :             subplan = NULL;
     213           36 :         pstmt->subplans = lappend(pstmt->subplans, subplan);
     214              :     }
     215              : 
     216          523 :     pstmt->rewindPlanIDs = NULL;
     217          523 :     pstmt->rowMarks = NIL;
     218              : 
     219              :     /*
     220              :      * Pass the row mark and result relation relids to parallel workers. They
     221              :      * may need to check them to inform heuristics.
     222              :      */
     223          523 :     pstmt->rowMarkRelids = estate->es_plannedstmt->rowMarkRelids;
     224          523 :     pstmt->resultRelationRelids = estate->es_plannedstmt->resultRelationRelids;
     225          523 :     pstmt->relationOids = NIL;
     226          523 :     pstmt->invalItems = NIL; /* workers can't replan anyway... */
     227          523 :     pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
     228          523 :     pstmt->utilityStmt = NULL;
     229          523 :     pstmt->stmt_location = -1;
     230          523 :     pstmt->stmt_len = -1;
     231              : 
     232              :     /* Return serialized copy of our dummy PlannedStmt. */
     233          523 :     return nodeToString(pstmt);
     234              : }
     235              : 
     236              : /*
     237              :  * Parallel-aware plan nodes (and occasionally others) may need some state
     238              :  * which is shared across all parallel workers.  Before we size the DSM, give
     239              :  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
     240              :  * &pcxt->estimator.
     241              :  *
     242              :  * While we're at it, count the number of PlanState nodes in the tree, so
     243              :  * we know how many Instrumentation structures we need.
     244              :  */
     245              : static bool
     246         3303 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
     247              : {
     248         3303 :     if (planstate == NULL)
     249            0 :         return false;
     250              : 
     251              :     /* Count this node. */
     252         3303 :     e->nnodes++;
     253              : 
     254         3303 :     switch (nodeTag(planstate))
     255              :     {
     256         1528 :         case T_SeqScanState:
     257         1528 :             if (planstate->plan->parallel_aware)
     258         1190 :                 ExecSeqScanEstimate((SeqScanState *) planstate,
     259              :                                     e->pcxt);
     260              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     261         1528 :             ExecSeqScanInstrumentEstimate((SeqScanState *) planstate,
     262              :                                           e->pcxt);
     263         1528 :             break;
     264          276 :         case T_IndexScanState:
     265          276 :             if (planstate->plan->parallel_aware)
     266           12 :                 ExecIndexScanEstimate((IndexScanState *) planstate,
     267              :                                       e->pcxt);
     268              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     269          276 :             ExecIndexScanInstrumentEstimate((IndexScanState *) planstate,
     270              :                                             e->pcxt);
     271          276 :             break;
     272           42 :         case T_IndexOnlyScanState:
     273           42 :             if (planstate->plan->parallel_aware)
     274           30 :                 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
     275              :                                           e->pcxt);
     276              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     277           42 :             ExecIndexOnlyScanInstrumentEstimate((IndexOnlyScanState *) planstate,
     278              :                                                 e->pcxt);
     279           42 :             break;
     280           13 :         case T_BitmapIndexScanState:
     281              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     282           13 :             ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
     283              :                                         e->pcxt);
     284           13 :             break;
     285            0 :         case T_ForeignScanState:
     286            0 :             if (planstate->plan->parallel_aware)
     287            0 :                 ExecForeignScanEstimate((ForeignScanState *) planstate,
     288              :                                         e->pcxt);
     289            0 :             break;
     290           16 :         case T_TidRangeScanState:
     291           16 :             if (planstate->plan->parallel_aware)
     292           16 :                 ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
     293              :                                          e->pcxt);
     294              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     295           16 :             ExecTidRangeScanInstrumentEstimate((TidRangeScanState *) planstate,
     296              :                                                e->pcxt);
     297           16 :             break;
     298          148 :         case T_AppendState:
     299          148 :             if (planstate->plan->parallel_aware)
     300          108 :                 ExecAppendEstimate((AppendState *) planstate,
     301              :                                    e->pcxt);
     302          148 :             break;
     303            0 :         case T_CustomScanState:
     304            0 :             if (planstate->plan->parallel_aware)
     305            0 :                 ExecCustomScanEstimate((CustomScanState *) planstate,
     306              :                                        e->pcxt);
     307            0 :             break;
     308           13 :         case T_BitmapHeapScanState:
     309           13 :             if (planstate->plan->parallel_aware)
     310           12 :                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
     311              :                                        e->pcxt);
     312              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     313           13 :             ExecBitmapHeapInstrumentEstimate((BitmapHeapScanState *) planstate,
     314              :                                              e->pcxt);
     315           13 :             break;
     316          208 :         case T_HashJoinState:
     317          208 :             if (planstate->plan->parallel_aware)
     318           84 :                 ExecHashJoinEstimate((HashJoinState *) planstate,
     319              :                                      e->pcxt);
     320          208 :             break;
     321          208 :         case T_HashState:
     322              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     323          208 :             ExecHashEstimate((HashState *) planstate, e->pcxt);
     324          208 :             break;
     325          201 :         case T_SortState:
     326              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     327          201 :             ExecSortEstimate((SortState *) planstate, e->pcxt);
     328          201 :             break;
     329            0 :         case T_IncrementalSortState:
     330              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     331            0 :             ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
     332            0 :             break;
     333          396 :         case T_AggState:
     334              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     335          396 :             ExecAggEstimate((AggState *) planstate, e->pcxt);
     336          396 :             break;
     337            4 :         case T_MemoizeState:
     338              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     339            4 :             ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
     340            4 :             break;
     341          250 :         default:
     342          250 :             break;
     343              :     }
     344              : 
     345         3303 :     return planstate_tree_walker(planstate, ExecParallelEstimate, e);
     346              : }
     347              : 
     348              : /*
     349              :  * Estimate the amount of space required to serialize the indicated parameters.
     350              :  */
     351              : static Size
     352           16 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
     353              : {
     354              :     int         paramid;
     355           16 :     Size        sz = sizeof(int);
     356              : 
     357           16 :     paramid = -1;
     358           36 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     359              :     {
     360              :         Oid         typeOid;
     361              :         int16       typLen;
     362              :         bool        typByVal;
     363              :         ParamExecData *prm;
     364              : 
     365           20 :         prm = &(estate->es_param_exec_vals[paramid]);
     366           20 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     367              :                                paramid);
     368              : 
     369           20 :         sz = add_size(sz, sizeof(int)); /* space for paramid */
     370              : 
     371              :         /* space for datum/isnull */
     372           20 :         if (OidIsValid(typeOid))
     373           20 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     374              :         else
     375              :         {
     376              :             /* If no type OID, assume by-value, like copyParamList does. */
     377            0 :             typLen = sizeof(Datum);
     378            0 :             typByVal = true;
     379              :         }
     380           20 :         sz = add_size(sz,
     381           20 :                       datumEstimateSpace(prm->value, prm->isnull,
     382              :                                          typByVal, typLen));
     383              :     }
     384           16 :     return sz;
     385              : }
     386              : 
     387              : /*
     388              :  * Serialize specified PARAM_EXEC parameters.
     389              :  *
     390              :  * We write the number of parameters first, as a 4-byte integer, and then
     391              :  * write details for each parameter in turn.  The details for each parameter
     392              :  * consist of a 4-byte paramid (location of param in execution time internal
     393              :  * parameter array) and then the datum as serialized by datumSerialize().
     394              :  */
     395              : static dsa_pointer
     396           16 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
     397              : {
     398              :     Size        size;
     399              :     int         nparams;
     400              :     int         paramid;
     401              :     ParamExecData *prm;
     402              :     dsa_pointer handle;
     403              :     char       *start_address;
     404              : 
     405              :     /* Allocate enough space for the current parameter values. */
     406           16 :     size = EstimateParamExecSpace(estate, params);
     407           16 :     handle = dsa_allocate(area, size);
     408           16 :     start_address = dsa_get_address(area, handle);
     409              : 
     410              :     /* First write the number of parameters as a 4-byte integer. */
     411           16 :     nparams = bms_num_members(params);
     412           16 :     memcpy(start_address, &nparams, sizeof(int));
     413           16 :     start_address += sizeof(int);
     414              : 
     415              :     /* Write details for each parameter in turn. */
     416           16 :     paramid = -1;
     417           36 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     418              :     {
     419              :         Oid         typeOid;
     420              :         int16       typLen;
     421              :         bool        typByVal;
     422              : 
     423           20 :         prm = &(estate->es_param_exec_vals[paramid]);
     424           20 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     425              :                                paramid);
     426              : 
     427              :         /* Write paramid. */
     428           20 :         memcpy(start_address, &paramid, sizeof(int));
     429           20 :         start_address += sizeof(int);
     430              : 
     431              :         /* Write datum/isnull */
     432           20 :         if (OidIsValid(typeOid))
     433           20 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     434              :         else
     435              :         {
     436              :             /* If no type OID, assume by-value, like copyParamList does. */
     437            0 :             typLen = sizeof(Datum);
     438            0 :             typByVal = true;
     439              :         }
     440           20 :         datumSerialize(prm->value, prm->isnull, typByVal, typLen,
     441              :                        &start_address);
     442              :     }
     443              : 
     444           16 :     return handle;
     445              : }
     446              : 
     447              : /*
     448              :  * Restore specified PARAM_EXEC parameters.
     449              :  */
     450              : static void
     451           48 : RestoreParamExecParams(char *start_address, EState *estate)
     452              : {
     453              :     int         nparams;
     454              :     int         i;
     455              :     int         paramid;
     456              : 
     457           48 :     memcpy(&nparams, start_address, sizeof(int));
     458           48 :     start_address += sizeof(int);
     459              : 
     460          104 :     for (i = 0; i < nparams; i++)
     461              :     {
     462              :         ParamExecData *prm;
     463              : 
     464              :         /* Read paramid */
     465           56 :         memcpy(&paramid, start_address, sizeof(int));
     466           56 :         start_address += sizeof(int);
     467           56 :         prm = &(estate->es_param_exec_vals[paramid]);
     468              : 
     469              :         /* Read datum/isnull. */
     470           56 :         prm->value = datumRestore(&start_address, &prm->isnull);
     471           56 :         prm->execPlan = NULL;
     472              :     }
     473           48 : }
     474              : 
     475              : /*
     476              :  * Initialize the dynamic shared memory segment that will be used to control
     477              :  * parallel execution.
     478              :  */
     479              : static bool
     480         3303 : ExecParallelInitializeDSM(PlanState *planstate,
     481              :                           ExecParallelInitializeDSMContext *d)
     482              : {
     483         3303 :     if (planstate == NULL)
     484            0 :         return false;
     485              : 
     486              :     /* If instrumentation is enabled, initialize slot for this node. */
     487         3303 :     if (d->instrumentation != NULL)
     488          684 :         d->instrumentation->plan_node_id[d->nnodes] =
     489          684 :             planstate->plan->plan_node_id;
     490              : 
     491              :     /* Count this node. */
     492         3303 :     d->nnodes++;
     493              : 
     494              :     /*
     495              :      * Call initializers for DSM-using plan nodes.
     496              :      *
     497              :      * Most plan nodes won't do anything here, but plan nodes that allocated
     498              :      * DSM may need to initialize shared state in the DSM before parallel
     499              :      * workers are launched.  They can allocate the space they previously
     500              :      * estimated using shm_toc_allocate, and add the keys they previously
     501              :      * estimated using shm_toc_insert, in each case targeting pcxt->toc.
     502              :      */
     503         3303 :     switch (nodeTag(planstate))
     504              :     {
     505         1528 :         case T_SeqScanState:
     506         1528 :             if (planstate->plan->parallel_aware)
     507         1190 :                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
     508              :                                          d->pcxt);
     509              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     510         1528 :             ExecSeqScanInstrumentInitDSM((SeqScanState *) planstate,
     511              :                                          d->pcxt);
     512         1528 :             break;
     513          276 :         case T_IndexScanState:
     514          276 :             if (planstate->plan->parallel_aware)
     515           12 :                 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
     516              :                                            d->pcxt);
     517              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     518          276 :             ExecIndexScanInstrumentInitDSM((IndexScanState *) planstate,
     519              :                                            d->pcxt);
     520          276 :             break;
     521           42 :         case T_IndexOnlyScanState:
     522           42 :             if (planstate->plan->parallel_aware)
     523           30 :                 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
     524              :                                                d->pcxt);
     525              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     526           42 :             ExecIndexOnlyScanInstrumentInitDSM((IndexOnlyScanState *) planstate,
     527              :                                                d->pcxt);
     528           42 :             break;
     529           13 :         case T_BitmapIndexScanState:
     530              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     531           13 :             ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
     532           13 :             break;
     533            0 :         case T_ForeignScanState:
     534            0 :             if (planstate->plan->parallel_aware)
     535            0 :                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
     536              :                                              d->pcxt);
     537            0 :             break;
     538           16 :         case T_TidRangeScanState:
     539           16 :             if (planstate->plan->parallel_aware)
     540           16 :                 ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
     541              :                                               d->pcxt);
     542              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     543           16 :             ExecTidRangeScanInstrumentInitDSM((TidRangeScanState *) planstate,
     544              :                                               d->pcxt);
     545           16 :             break;
     546          148 :         case T_AppendState:
     547          148 :             if (planstate->plan->parallel_aware)
     548          108 :                 ExecAppendInitializeDSM((AppendState *) planstate,
     549              :                                         d->pcxt);
     550          148 :             break;
     551            0 :         case T_CustomScanState:
     552            0 :             if (planstate->plan->parallel_aware)
     553            0 :                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
     554              :                                             d->pcxt);
     555            0 :             break;
     556           13 :         case T_BitmapHeapScanState:
     557           13 :             if (planstate->plan->parallel_aware)
     558           12 :                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
     559              :                                             d->pcxt);
     560              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     561           13 :             ExecBitmapHeapInstrumentInitDSM((BitmapHeapScanState *) planstate,
     562              :                                             d->pcxt);
     563           13 :             break;
     564          208 :         case T_HashJoinState:
     565          208 :             if (planstate->plan->parallel_aware)
     566           84 :                 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
     567              :                                           d->pcxt);
     568          208 :             break;
     569          208 :         case T_HashState:
     570              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     571          208 :             ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
     572          208 :             break;
     573          201 :         case T_SortState:
     574              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     575          201 :             ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
     576          201 :             break;
     577            0 :         case T_IncrementalSortState:
     578              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     579            0 :             ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
     580            0 :             break;
     581          396 :         case T_AggState:
     582              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     583          396 :             ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
     584          396 :             break;
     585            4 :         case T_MemoizeState:
     586              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     587            4 :             ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
     588            4 :             break;
     589          250 :         default:
     590          250 :             break;
     591              :     }
     592              : 
     593         3303 :     return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
     594              : }
     595              : 
     596              : /*
     597              :  * It sets up the response queues for backend workers to return tuples
     598              :  * to the main backend and start the workers.
     599              :  */
     600              : static shm_mq_handle **
     601          695 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
     602              : {
     603              :     shm_mq_handle **responseq;
     604              :     char       *tqueuespace;
     605              :     int         i;
     606              : 
     607              :     /* Skip this if no workers. */
     608          695 :     if (pcxt->nworkers == 0)
     609            0 :         return NULL;
     610              : 
     611              :     /* Allocate memory for shared memory queue handles. */
     612              :     responseq = (shm_mq_handle **)
     613          695 :         palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
     614              : 
     615              :     /*
     616              :      * If not reinitializing, allocate space from the DSM for the queues;
     617              :      * otherwise, find the already allocated space.
     618              :      */
     619          695 :     if (!reinitialize)
     620              :         tqueuespace =
     621          523 :             shm_toc_allocate(pcxt->toc,
     622              :                              mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
     623          523 :                                       pcxt->nworkers));
     624              :     else
     625          172 :         tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     626              : 
     627              :     /* Create the queues, and become the receiver for each. */
     628         2564 :     for (i = 0; i < pcxt->nworkers; ++i)
     629              :     {
     630              :         shm_mq     *mq;
     631              : 
     632         1869 :         mq = shm_mq_create(tqueuespace +
     633         1869 :                            ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
     634              :                            (Size) PARALLEL_TUPLE_QUEUE_SIZE);
     635              : 
     636         1869 :         shm_mq_set_receiver(mq, MyProc);
     637         1869 :         responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
     638              :     }
     639              : 
     640              :     /* Add array of queues to shm_toc, so others can find it. */
     641          695 :     if (!reinitialize)
     642          523 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
     643              : 
     644              :     /* Return array of handles. */
     645          695 :     return responseq;
     646              : }
     647              : 
     648              : /*
     649              :  * Sets up the required infrastructure for backend workers to perform
     650              :  * execution and return results to the main backend.
     651              :  */
     652              : ParallelExecutorInfo *
     653          523 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
     654              :                      Bitmapset *sendParams, int nworkers,
     655              :                      int64 tuples_needed)
     656              : {
     657              :     ParallelExecutorInfo *pei;
     658              :     ParallelContext *pcxt;
     659              :     ExecParallelEstimateContext e;
     660              :     ExecParallelInitializeDSMContext d;
     661              :     FixedParallelExecutorState *fpes;
     662              :     char       *pstmt_data;
     663              :     char       *pstmt_space;
     664              :     char       *paramlistinfo_space;
     665              :     BufferUsage *bufusage_space;
     666              :     WalUsage   *walusage_space;
     667          523 :     SharedExecutorInstrumentation *instrumentation = NULL;
     668          523 :     SharedJitInstrumentation *jit_instrumentation = NULL;
     669              :     int         pstmt_len;
     670              :     int         paramlistinfo_len;
     671          523 :     int         instrumentation_len = 0;
     672          523 :     int         jit_instrumentation_len = 0;
     673          523 :     int         instrument_offset = 0;
     674          523 :     Size        dsa_minsize = dsa_minimum_size();
     675              :     char       *query_string;
     676              :     int         query_len;
     677              : 
     678              :     /*
     679              :      * Force any initplan outputs that we're going to pass to workers to be
     680              :      * evaluated, if they weren't already.
     681              :      *
     682              :      * For simplicity, we use the EState's per-output-tuple ExprContext here.
     683              :      * That risks intra-query memory leakage, since we might pass through here
     684              :      * many times before that ExprContext gets reset; but ExecSetParamPlan
     685              :      * doesn't normally leak any memory in the context (see its comments), so
     686              :      * it doesn't seem worth complicating this function's API to pass it a
     687              :      * shorter-lived ExprContext.  This might need to change someday.
     688              :      */
     689          523 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     690              : 
     691              :     /* Allocate object for return value. */
     692          523 :     pei = palloc0_object(ParallelExecutorInfo);
     693          523 :     pei->finished = false;
     694          523 :     pei->planstate = planstate;
     695              : 
     696              :     /* Fix up and serialize plan to be sent to workers. */
     697          523 :     pstmt_data = ExecSerializePlan(planstate->plan, estate);
     698              : 
     699              :     /* Create a parallel context. */
     700          523 :     pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
     701          523 :     pei->pcxt = pcxt;
     702              : 
     703              :     /*
     704              :      * Before telling the parallel context to create a dynamic shared memory
     705              :      * segment, we need to figure out how big it should be.  Estimate space
     706              :      * for the various things we need to store.
     707              :      */
     708              : 
     709              :     /* Estimate space for fixed-size state. */
     710          523 :     shm_toc_estimate_chunk(&pcxt->estimator,
     711              :                            sizeof(FixedParallelExecutorState));
     712          523 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     713              : 
     714              :     /* Estimate space for query text. */
     715          523 :     query_len = strlen(estate->es_sourceText);
     716          523 :     shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
     717          523 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     718              : 
     719              :     /* Estimate space for serialized PlannedStmt. */
     720          523 :     pstmt_len = strlen(pstmt_data) + 1;
     721          523 :     shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
     722          523 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     723              : 
     724              :     /* Estimate space for serialized ParamListInfo. */
     725          523 :     paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
     726          523 :     shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
     727          523 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     728              : 
     729              :     /*
     730              :      * Estimate space for BufferUsage.
     731              :      *
     732              :      * If EXPLAIN is not in use and there are no extensions loaded that care,
     733              :      * we could skip this.  But we have no way of knowing whether anyone's
     734              :      * looking at pgBufferUsage, so do it unconditionally.
     735              :      */
     736          523 :     shm_toc_estimate_chunk(&pcxt->estimator,
     737              :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     738          523 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     739              : 
     740              :     /*
     741              :      * Same thing for WalUsage.
     742              :      */
     743          523 :     shm_toc_estimate_chunk(&pcxt->estimator,
     744              :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     745          523 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     746              : 
     747              :     /* Estimate space for tuple queues. */
     748          523 :     shm_toc_estimate_chunk(&pcxt->estimator,
     749              :                            mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
     750          523 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     751              : 
     752              :     /*
     753              :      * Give parallel-aware nodes a chance to add to the estimates, and get a
     754              :      * count of how many PlanState nodes there are.
     755              :      */
     756          523 :     e.pcxt = pcxt;
     757          523 :     e.nnodes = 0;
     758          523 :     ExecParallelEstimate(planstate, &e);
     759              : 
     760              :     /* Estimate space for instrumentation, if required. */
     761          523 :     if (estate->es_instrument)
     762              :     {
     763          120 :         instrumentation_len =
     764              :             offsetof(SharedExecutorInstrumentation, plan_node_id) +
     765          120 :             sizeof(int) * e.nnodes;
     766          120 :         instrumentation_len = MAXALIGN(instrumentation_len);
     767          120 :         instrument_offset = instrumentation_len;
     768          120 :         instrumentation_len +=
     769          120 :             mul_size(sizeof(NodeInstrumentation),
     770          120 :                      mul_size(e.nnodes, nworkers));
     771          120 :         shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
     772          120 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     773              : 
     774              :         /* Estimate space for JIT instrumentation, if required. */
     775          120 :         if (estate->es_jit_flags != PGJIT_NONE)
     776              :         {
     777            0 :             jit_instrumentation_len =
     778            0 :                 offsetof(SharedJitInstrumentation, jit_instr) +
     779              :                 sizeof(JitInstrumentation) * nworkers;
     780            0 :             shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
     781            0 :             shm_toc_estimate_keys(&pcxt->estimator, 1);
     782              :         }
     783              :     }
     784              : 
     785              :     /* Estimate space for DSA area. */
     786          523 :     shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
     787          523 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     788              : 
     789              :     /*
     790              :      * InitializeParallelDSM() passes the active snapshot to the parallel
     791              :      * worker, which uses it to set es_snapshot.  Make sure we don't set
     792              :      * es_snapshot differently in the child.
     793              :      */
     794              :     Assert(GetActiveSnapshot() == estate->es_snapshot);
     795              : 
     796              :     /* Everyone's had a chance to ask for space, so now create the DSM. */
     797          523 :     InitializeParallelDSM(pcxt);
     798              : 
     799              :     /*
     800              :      * OK, now we have a dynamic shared memory segment, and it should be big
     801              :      * enough to store all of the data we estimated we would want to put into
     802              :      * it, plus whatever general stuff (not specifically executor-related) the
     803              :      * ParallelContext itself needs to store there.  None of the space we
     804              :      * asked for has been allocated or initialized yet, though, so do that.
     805              :      */
     806              : 
     807              :     /* Store fixed-size state. */
     808          523 :     fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
     809          523 :     fpes->tuples_needed = tuples_needed;
     810          523 :     fpes->param_exec = InvalidDsaPointer;
     811          523 :     fpes->eflags = estate->es_top_eflags;
     812          523 :     fpes->jit_flags = estate->es_jit_flags;
     813          523 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
     814              : 
     815              :     /* Store query string */
     816          523 :     query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
     817          523 :     memcpy(query_string, estate->es_sourceText, query_len + 1);
     818          523 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
     819              : 
     820              :     /* Store serialized PlannedStmt. */
     821          523 :     pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
     822          523 :     memcpy(pstmt_space, pstmt_data, pstmt_len);
     823          523 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
     824              : 
     825              :     /* Store serialized ParamListInfo. */
     826          523 :     paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
     827          523 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
     828          523 :     SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
     829              : 
     830              :     /* Allocate space for each worker's BufferUsage; no need to initialize. */
     831          523 :     bufusage_space = shm_toc_allocate(pcxt->toc,
     832          523 :                                       mul_size(sizeof(BufferUsage), pcxt->nworkers));
     833          523 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
     834          523 :     pei->buffer_usage = bufusage_space;
     835              : 
     836              :     /* Same for WalUsage. */
     837          523 :     walusage_space = shm_toc_allocate(pcxt->toc,
     838          523 :                                       mul_size(sizeof(WalUsage), pcxt->nworkers));
     839          523 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
     840          523 :     pei->wal_usage = walusage_space;
     841              : 
     842              :     /* Set up the tuple queues that the workers will write into. */
     843          523 :     pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     844              : 
     845              :     /* We don't need the TupleQueueReaders yet, though. */
     846          523 :     pei->reader = NULL;
     847              : 
     848              :     /*
     849              :      * If instrumentation options were supplied, allocate space for the data.
     850              :      * It only gets partially initialized here; the rest happens during
     851              :      * ExecParallelInitializeDSM.
     852              :      */
     853          523 :     if (estate->es_instrument)
     854              :     {
     855              :         NodeInstrumentation *instrument;
     856              :         int         i;
     857              : 
     858          120 :         instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
     859          120 :         instrumentation->instrument_options = estate->es_instrument;
     860          120 :         instrumentation->instrument_offset = instrument_offset;
     861          120 :         instrumentation->num_workers = nworkers;
     862          120 :         instrumentation->num_plan_nodes = e.nnodes;
     863          120 :         instrument = GetInstrumentationArray(instrumentation);
     864         1240 :         for (i = 0; i < nworkers * e.nnodes; ++i)
     865         1120 :             InstrInitNode(&instrument[i], estate->es_instrument, false);
     866          120 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
     867              :                        instrumentation);
     868          120 :         pei->instrumentation = instrumentation;
     869              : 
     870          120 :         if (estate->es_jit_flags != PGJIT_NONE)
     871              :         {
     872            0 :             jit_instrumentation = shm_toc_allocate(pcxt->toc,
     873              :                                                    jit_instrumentation_len);
     874            0 :             jit_instrumentation->num_workers = nworkers;
     875            0 :             memset(jit_instrumentation->jit_instr, 0,
     876              :                    sizeof(JitInstrumentation) * nworkers);
     877            0 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
     878              :                            jit_instrumentation);
     879            0 :             pei->jit_instrumentation = jit_instrumentation;
     880              :         }
     881              :     }
     882              : 
     883              :     /*
     884              :      * Create a DSA area that can be used by the leader and all workers.
     885              :      * (However, if we failed to create a DSM and are using private memory
     886              :      * instead, then skip this.)
     887              :      */
     888          523 :     if (pcxt->seg != NULL)
     889              :     {
     890              :         char       *area_space;
     891              : 
     892          523 :         area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
     893          523 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
     894          523 :         pei->area = dsa_create_in_place(area_space, dsa_minsize,
     895              :                                         LWTRANCHE_PARALLEL_QUERY_DSA,
     896              :                                         pcxt->seg);
     897              : 
     898              :         /*
     899              :          * Serialize parameters, if any, using DSA storage.  We don't dare use
     900              :          * the main parallel query DSM for this because we might relaunch
     901              :          * workers after the values have changed (and thus the amount of
     902              :          * storage required has changed).
     903              :          */
     904          523 :         if (!bms_is_empty(sendParams))
     905              :         {
     906           16 :             pei->param_exec = SerializeParamExecParams(estate, sendParams,
     907              :                                                        pei->area);
     908           16 :             fpes->param_exec = pei->param_exec;
     909              :         }
     910              :     }
     911              : 
     912              :     /*
     913              :      * Give parallel-aware nodes a chance to initialize their shared data.
     914              :      * This also initializes the elements of instrumentation->ps_instrument,
     915              :      * if it exists.
     916              :      */
     917          523 :     d.pcxt = pcxt;
     918          523 :     d.instrumentation = instrumentation;
     919          523 :     d.nnodes = 0;
     920              : 
     921              :     /* Install our DSA area while initializing the plan. */
     922          523 :     estate->es_query_dsa = pei->area;
     923          523 :     ExecParallelInitializeDSM(planstate, &d);
     924          523 :     estate->es_query_dsa = NULL;
     925              : 
     926              :     /*
     927              :      * Make sure that the world hasn't shifted under our feet.  This could
     928              :      * probably just be an Assert(), but let's be conservative for now.
     929              :      */
     930          523 :     if (e.nnodes != d.nnodes)
     931            0 :         elog(ERROR, "inconsistent count of PlanState nodes");
     932              : 
     933              :     /* OK, we're ready to rock and roll. */
     934          523 :     return pei;
     935              : }
     936              : 
     937              : /*
     938              :  * Set up tuple queue readers to read the results of a parallel subplan.
     939              :  *
     940              :  * This is separate from ExecInitParallelPlan() because we can launch the
     941              :  * worker processes and let them start doing something before we do this.
     942              :  */
     943              : void
     944          683 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
     945              : {
     946          683 :     int         nworkers = pei->pcxt->nworkers_launched;
     947              :     int         i;
     948              : 
     949              :     Assert(pei->reader == NULL);
     950              : 
     951          683 :     if (nworkers > 0)
     952              :     {
     953          683 :         pei->reader = (TupleQueueReader **)
     954          683 :             palloc(nworkers * sizeof(TupleQueueReader *));
     955              : 
     956         2496 :         for (i = 0; i < nworkers; i++)
     957              :         {
     958         1813 :             shm_mq_set_handle(pei->tqueue[i],
     959         1813 :                               pei->pcxt->worker[i].bgwhandle);
     960         1813 :             pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
     961              :         }
     962              :     }
     963          683 : }
     964              : 
     965              : /*
     966              :  * Re-initialize the parallel executor shared memory state before launching
     967              :  * a fresh batch of workers.
     968              :  */
     969              : void
     970          172 : ExecParallelReinitialize(PlanState *planstate,
     971              :                          ParallelExecutorInfo *pei,
     972              :                          Bitmapset *sendParams)
     973              : {
     974          172 :     EState     *estate = planstate->state;
     975              :     FixedParallelExecutorState *fpes;
     976              : 
     977              :     /* Old workers must already be shut down */
     978              :     Assert(pei->finished);
     979              : 
     980              :     /*
     981              :      * Force any initplan outputs that we're going to pass to workers to be
     982              :      * evaluated, if they weren't already (see comments in
     983              :      * ExecInitParallelPlan).
     984              :      */
     985          172 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     986              : 
     987          172 :     ReinitializeParallelDSM(pei->pcxt);
     988          172 :     pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
     989          172 :     pei->reader = NULL;
     990          172 :     pei->finished = false;
     991              : 
     992          172 :     fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
     993              : 
     994              :     /* Free any serialized parameters from the last round. */
     995          172 :     if (DsaPointerIsValid(fpes->param_exec))
     996              :     {
     997            0 :         dsa_free(pei->area, fpes->param_exec);
     998            0 :         fpes->param_exec = InvalidDsaPointer;
     999              :     }
    1000              : 
    1001              :     /* Serialize current parameter values if required. */
    1002          172 :     if (!bms_is_empty(sendParams))
    1003              :     {
    1004            0 :         pei->param_exec = SerializeParamExecParams(estate, sendParams,
    1005              :                                                    pei->area);
    1006            0 :         fpes->param_exec = pei->param_exec;
    1007              :     }
    1008              : 
    1009              :     /* Traverse plan tree and let each child node reset associated state. */
    1010          172 :     estate->es_query_dsa = pei->area;
    1011          172 :     ExecParallelReInitializeDSM(planstate, pei->pcxt);
    1012          172 :     estate->es_query_dsa = NULL;
    1013          172 : }
    1014              : 
    1015              : /*
    1016              :  * Traverse plan tree to reinitialize per-node dynamic shared memory state
    1017              :  */
    1018              : static bool
    1019          444 : ExecParallelReInitializeDSM(PlanState *planstate,
    1020              :                             ParallelContext *pcxt)
    1021              : {
    1022          444 :     if (planstate == NULL)
    1023            0 :         return false;
    1024              : 
    1025              :     /*
    1026              :      * Call reinitializers for DSM-using plan nodes.
    1027              :      */
    1028          444 :     switch (nodeTag(planstate))
    1029              :     {
    1030          184 :         case T_SeqScanState:
    1031          184 :             if (planstate->plan->parallel_aware)
    1032          152 :                 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
    1033              :                                            pcxt);
    1034          184 :             break;
    1035            8 :         case T_IndexScanState:
    1036            8 :             if (planstate->plan->parallel_aware)
    1037            8 :                 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
    1038              :                                              pcxt);
    1039            8 :             break;
    1040            8 :         case T_IndexOnlyScanState:
    1041            8 :             if (planstate->plan->parallel_aware)
    1042            8 :                 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
    1043              :                                                  pcxt);
    1044            8 :             break;
    1045            0 :         case T_ForeignScanState:
    1046            0 :             if (planstate->plan->parallel_aware)
    1047            0 :                 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
    1048              :                                                pcxt);
    1049            0 :             break;
    1050            0 :         case T_TidRangeScanState:
    1051            0 :             if (planstate->plan->parallel_aware)
    1052            0 :                 ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
    1053              :                                                 pcxt);
    1054            0 :             break;
    1055            0 :         case T_AppendState:
    1056            0 :             if (planstate->plan->parallel_aware)
    1057            0 :                 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
    1058            0 :             break;
    1059            0 :         case T_CustomScanState:
    1060            0 :             if (planstate->plan->parallel_aware)
    1061            0 :                 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
    1062              :                                               pcxt);
    1063            0 :             break;
    1064           36 :         case T_BitmapHeapScanState:
    1065           36 :             if (planstate->plan->parallel_aware)
    1066           36 :                 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
    1067              :                                               pcxt);
    1068           36 :             break;
    1069           64 :         case T_HashJoinState:
    1070           64 :             if (planstate->plan->parallel_aware)
    1071           32 :                 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
    1072              :                                             pcxt);
    1073           64 :             break;
    1074          120 :         case T_BitmapIndexScanState:
    1075              :         case T_HashState:
    1076              :         case T_SortState:
    1077              :         case T_IncrementalSortState:
    1078              :         case T_MemoizeState:
    1079              :             /* these nodes have DSM state, but no reinitialization is required */
    1080          120 :             break;
    1081              : 
    1082           24 :         default:
    1083           24 :             break;
    1084              :     }
    1085              : 
    1086          444 :     return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
    1087              : }
    1088              : 
    1089              : /*
    1090              :  * Copy instrumentation information about this node and its descendants from
    1091              :  * dynamic shared memory.
    1092              :  */
    1093              : static bool
    1094          684 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
    1095              :                                     SharedExecutorInstrumentation *instrumentation)
    1096              : {
    1097              :     NodeInstrumentation *instrument;
    1098              :     int         i;
    1099              :     int         n;
    1100              :     int         ibytes;
    1101          684 :     int         plan_node_id = planstate->plan->plan_node_id;
    1102              :     MemoryContext oldcontext;
    1103              : 
    1104              :     /* Find the instrumentation for this node. */
    1105         3092 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1106         3092 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1107          684 :             break;
    1108          684 :     if (i >= instrumentation->num_plan_nodes)
    1109            0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1110              : 
    1111              :     /* Accumulate the statistics from all workers. */
    1112          684 :     instrument = GetInstrumentationArray(instrumentation);
    1113          684 :     instrument += i * instrumentation->num_workers;
    1114         1804 :     for (n = 0; n < instrumentation->num_workers; ++n)
    1115         1120 :         InstrAggNode(planstate->instrument, &instrument[n]);
    1116              : 
    1117              :     /*
    1118              :      * Also store the per-worker detail.
    1119              :      *
    1120              :      * Worker instrumentation should be allocated in the same context as the
    1121              :      * regular instrumentation information, which is the per-query context.
    1122              :      * Switch into per-query memory context.
    1123              :      */
    1124          684 :     oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
    1125          684 :     ibytes = mul_size(instrumentation->num_workers, sizeof(NodeInstrumentation));
    1126          684 :     planstate->worker_instrument =
    1127          684 :         palloc(ibytes + offsetof(WorkerNodeInstrumentation, instrument));
    1128          684 :     MemoryContextSwitchTo(oldcontext);
    1129              : 
    1130          684 :     planstate->worker_instrument->num_workers = instrumentation->num_workers;
    1131          684 :     memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
    1132              : 
    1133              :     /* Perform any node-type-specific work that needs to be done. */
    1134          684 :     switch (nodeTag(planstate))
    1135              :     {
    1136          180 :         case T_IndexScanState:
    1137          180 :             ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
    1138          180 :             break;
    1139            0 :         case T_IndexOnlyScanState:
    1140            0 :             ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
    1141            0 :             break;
    1142            0 :         case T_BitmapIndexScanState:
    1143            0 :             ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
    1144            0 :             break;
    1145            8 :         case T_SortState:
    1146            8 :             ExecSortRetrieveInstrumentation((SortState *) planstate);
    1147            8 :             break;
    1148            0 :         case T_IncrementalSortState:
    1149            0 :             ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
    1150            0 :             break;
    1151           56 :         case T_HashState:
    1152           56 :             ExecHashRetrieveInstrumentation((HashState *) planstate);
    1153           56 :             break;
    1154           68 :         case T_AggState:
    1155           68 :             ExecAggRetrieveInstrumentation((AggState *) planstate);
    1156           68 :             break;
    1157            0 :         case T_MemoizeState:
    1158            0 :             ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
    1159            0 :             break;
    1160            0 :         case T_BitmapHeapScanState:
    1161            0 :             ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
    1162            0 :             break;
    1163          232 :         case T_SeqScanState:
    1164          232 :             ExecSeqScanRetrieveInstrumentation((SeqScanState *) planstate);
    1165          232 :             break;
    1166            0 :         case T_TidRangeScanState:
    1167            0 :             ExecTidRangeScanRetrieveInstrumentation((TidRangeScanState *) planstate);
    1168            0 :             break;
    1169          140 :         default:
    1170          140 :             break;
    1171              :     }
    1172              : 
    1173          684 :     return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
    1174              :                                  instrumentation);
    1175              : }
    1176              : 
    1177              : /*
    1178              :  * Add up the workers' JIT instrumentation from dynamic shared memory.
    1179              :  */
    1180              : static void
    1181            0 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
    1182              :                                        SharedJitInstrumentation *shared_jit)
    1183              : {
    1184              :     JitInstrumentation *combined;
    1185              :     int         ibytes;
    1186              : 
    1187              :     int         n;
    1188              : 
    1189              :     /*
    1190              :      * Accumulate worker JIT instrumentation into the combined JIT
    1191              :      * instrumentation, allocating it if required.
    1192              :      */
    1193            0 :     if (!planstate->state->es_jit_worker_instr)
    1194            0 :         planstate->state->es_jit_worker_instr =
    1195            0 :             MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
    1196            0 :     combined = planstate->state->es_jit_worker_instr;
    1197              : 
    1198              :     /* Accumulate all the workers' instrumentations. */
    1199            0 :     for (n = 0; n < shared_jit->num_workers; ++n)
    1200            0 :         InstrJitAgg(combined, &shared_jit->jit_instr[n]);
    1201              : 
    1202              :     /*
    1203              :      * Store the per-worker detail.
    1204              :      *
    1205              :      * Similar to ExecParallelRetrieveInstrumentation(), allocate the
    1206              :      * instrumentation in per-query context.
    1207              :      */
    1208            0 :     ibytes = offsetof(SharedJitInstrumentation, jit_instr)
    1209            0 :         + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
    1210            0 :     planstate->worker_jit_instrument =
    1211            0 :         MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
    1212              : 
    1213            0 :     memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
    1214            0 : }
    1215              : 
    1216              : /*
    1217              :  * Finish parallel execution.  We wait for parallel workers to finish, and
    1218              :  * accumulate their buffer/WAL usage.
    1219              :  */
    1220              : void
    1221         1242 : ExecParallelFinish(ParallelExecutorInfo *pei)
    1222              : {
    1223         1242 :     int         nworkers = pei->pcxt->nworkers_launched;
    1224              :     int         i;
    1225              : 
    1226              :     /* Make this be a no-op if called twice in a row. */
    1227         1242 :     if (pei->finished)
    1228          555 :         return;
    1229              : 
    1230              :     /*
    1231              :      * Detach from tuple queues ASAP, so that any still-active workers will
    1232              :      * notice that no further results are wanted.
    1233              :      */
    1234          687 :     if (pei->tqueue != NULL)
    1235              :     {
    1236         2492 :         for (i = 0; i < nworkers; i++)
    1237         1805 :             shm_mq_detach(pei->tqueue[i]);
    1238          687 :         pfree(pei->tqueue);
    1239          687 :         pei->tqueue = NULL;
    1240              :     }
    1241              : 
    1242              :     /*
    1243              :      * While we're waiting for the workers to finish, let's get rid of the
    1244              :      * tuple queue readers.  (Any other local cleanup could be done here too.)
    1245              :      */
    1246          687 :     if (pei->reader != NULL)
    1247              :     {
    1248         2480 :         for (i = 0; i < nworkers; i++)
    1249         1805 :             DestroyTupleQueueReader(pei->reader[i]);
    1250          675 :         pfree(pei->reader);
    1251          675 :         pei->reader = NULL;
    1252              :     }
    1253              : 
    1254              :     /* Now wait for the workers to finish. */
    1255          687 :     WaitForParallelWorkersToFinish(pei->pcxt);
    1256              : 
    1257              :     /*
    1258              :      * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
    1259              :      * finish, or we might get incomplete data.)
    1260              :      */
    1261         2492 :     for (i = 0; i < nworkers; i++)
    1262         1805 :         InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
    1263              : 
    1264          687 :     pei->finished = true;
    1265              : }
    1266              : 
    1267              : /*
    1268              :  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
    1269              :  * resources still exist after ExecParallelFinish.  We separate these
    1270              :  * routines because someone might want to examine the contents of the DSM
    1271              :  * after ExecParallelFinish and before calling this routine.
    1272              :  */
    1273              : void
    1274          515 : ExecParallelCleanup(ParallelExecutorInfo *pei)
    1275              : {
    1276              :     /* Accumulate instrumentation, if any. */
    1277          515 :     if (pei->instrumentation)
    1278          120 :         ExecParallelRetrieveInstrumentation(pei->planstate,
    1279              :                                             pei->instrumentation);
    1280              : 
    1281              :     /* Accumulate JIT instrumentation, if any. */
    1282          515 :     if (pei->jit_instrumentation)
    1283            0 :         ExecParallelRetrieveJitInstrumentation(pei->planstate,
    1284            0 :                                                pei->jit_instrumentation);
    1285              : 
    1286              :     /* Free any serialized parameters. */
    1287          515 :     if (DsaPointerIsValid(pei->param_exec))
    1288              :     {
    1289           16 :         dsa_free(pei->area, pei->param_exec);
    1290           16 :         pei->param_exec = InvalidDsaPointer;
    1291              :     }
    1292          515 :     if (pei->area != NULL)
    1293              :     {
    1294          515 :         dsa_detach(pei->area);
    1295          515 :         pei->area = NULL;
    1296              :     }
    1297          515 :     if (pei->pcxt != NULL)
    1298              :     {
    1299          515 :         DestroyParallelContext(pei->pcxt);
    1300          515 :         pei->pcxt = NULL;
    1301              :     }
    1302          515 :     pfree(pei);
    1303          515 : }
    1304              : 
    1305              : /*
    1306              :  * Create a DestReceiver to write tuples we produce to the shm_mq designated
    1307              :  * for that purpose.
    1308              :  */
    1309              : static DestReceiver *
    1310         1813 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
    1311              : {
    1312              :     char       *mqspace;
    1313              :     shm_mq     *mq;
    1314              : 
    1315         1813 :     mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
    1316         1813 :     mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
    1317         1813 :     mq = (shm_mq *) mqspace;
    1318         1813 :     shm_mq_set_sender(mq, MyProc);
    1319         1813 :     return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
    1320              : }
    1321              : 
    1322              : /*
    1323              :  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
    1324              :  */
    1325              : static QueryDesc *
    1326         1813 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
    1327              :                          int instrument_options)
    1328              : {
    1329              :     char       *pstmtspace;
    1330              :     char       *paramspace;
    1331              :     PlannedStmt *pstmt;
    1332              :     ParamListInfo paramLI;
    1333              :     char       *queryString;
    1334              : 
    1335              :     /* Get the query string from shared memory */
    1336         1813 :     queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
    1337              : 
    1338              :     /* Reconstruct leader-supplied PlannedStmt. */
    1339         1813 :     pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
    1340         1813 :     pstmt = (PlannedStmt *) stringToNode(pstmtspace);
    1341              : 
    1342              :     /* Reconstruct ParamListInfo. */
    1343         1813 :     paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
    1344         1813 :     paramLI = RestoreParamList(&paramspace);
    1345              : 
    1346              :     /* Create a QueryDesc for the query. */
    1347         1813 :     return CreateQueryDesc(pstmt,
    1348              :                            queryString,
    1349              :                            GetActiveSnapshot(), InvalidSnapshot,
    1350              :                            receiver, paramLI, NULL, instrument_options);
    1351              : }
    1352              : 
    1353              : /*
    1354              :  * Copy instrumentation information from this node and its descendants into
    1355              :  * dynamic shared memory, so that the parallel leader can retrieve it.
    1356              :  */
    1357              : static bool
    1358         1579 : ExecParallelReportInstrumentation(PlanState *planstate,
    1359              :                                   SharedExecutorInstrumentation *instrumentation)
    1360              : {
    1361              :     int         i;
    1362         1579 :     int         plan_node_id = planstate->plan->plan_node_id;
    1363              :     NodeInstrumentation *instrument;
    1364              : 
    1365         1579 :     InstrEndLoop(planstate->instrument);
    1366              : 
    1367              :     /*
    1368              :      * If we shuffled the plan_node_id values in ps_instrument into sorted
    1369              :      * order, we could use binary search here.  This might matter someday if
    1370              :      * we're pushing down sufficiently large plan trees.  For now, do it the
    1371              :      * slow, dumb way.
    1372              :      */
    1373         5193 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1374         5193 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1375         1579 :             break;
    1376         1579 :     if (i >= instrumentation->num_plan_nodes)
    1377            0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1378              : 
    1379              :     /*
    1380              :      * Add our statistics to the per-node, per-worker totals.  It's possible
    1381              :      * that this could happen more than once if we relaunched workers.
    1382              :      */
    1383         1579 :     instrument = GetInstrumentationArray(instrumentation);
    1384         1579 :     instrument += i * instrumentation->num_workers;
    1385              :     Assert(IsParallelWorker());
    1386              :     Assert(ParallelWorkerNumber < instrumentation->num_workers);
    1387         1579 :     InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
    1388              : 
    1389         1579 :     return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
    1390              :                                  instrumentation);
    1391              : }
    1392              : 
    1393              : /*
    1394              :  * Initialize the PlanState and its descendants with the information
    1395              :  * retrieved from shared memory.  This has to be done once the PlanState
    1396              :  * is allocated and initialized by executor; that is, after ExecutorStart().
    1397              :  */
    1398              : static bool
    1399         8156 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
    1400              : {
    1401         8156 :     if (planstate == NULL)
    1402            0 :         return false;
    1403              : 
    1404         8156 :     switch (nodeTag(planstate))
    1405              :     {
    1406         3744 :         case T_SeqScanState:
    1407         3744 :             if (planstate->plan->parallel_aware)
    1408         2966 :                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
    1409              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1410         3744 :             ExecSeqScanInstrumentInitWorker((SeqScanState *) planstate, pwcxt);
    1411         3744 :             break;
    1412          424 :         case T_IndexScanState:
    1413          424 :             if (planstate->plan->parallel_aware)
    1414           80 :                 ExecIndexScanInitializeWorker((IndexScanState *) planstate,
    1415              :                                               pwcxt);
    1416              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1417          424 :             ExecIndexScanInstrumentInitWorker((IndexScanState *) planstate,
    1418              :                                               pwcxt);
    1419          424 :             break;
    1420          168 :         case T_IndexOnlyScanState:
    1421          168 :             if (planstate->plan->parallel_aware)
    1422          136 :                 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
    1423              :                                                   pwcxt);
    1424              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1425          168 :             ExecIndexOnlyScanInstrumentInitWorker((IndexOnlyScanState *) planstate,
    1426              :                                                   pwcxt);
    1427          168 :             break;
    1428          181 :         case T_BitmapIndexScanState:
    1429              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1430          181 :             ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
    1431              :                                                 pwcxt);
    1432          181 :             break;
    1433            0 :         case T_ForeignScanState:
    1434            0 :             if (planstate->plan->parallel_aware)
    1435            0 :                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
    1436              :                                                 pwcxt);
    1437            0 :             break;
    1438           64 :         case T_TidRangeScanState:
    1439           64 :             if (planstate->plan->parallel_aware)
    1440           64 :                 ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
    1441              :                                                  pwcxt);
    1442              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1443           64 :             ExecTidRangeScanInstrumentInitWorker((TidRangeScanState *) planstate,
    1444              :                                                  pwcxt);
    1445           64 :             break;
    1446          297 :         case T_AppendState:
    1447          297 :             if (planstate->plan->parallel_aware)
    1448          241 :                 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
    1449          297 :             break;
    1450            0 :         case T_CustomScanState:
    1451            0 :             if (planstate->plan->parallel_aware)
    1452            0 :                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
    1453              :                                                pwcxt);
    1454            0 :             break;
    1455          181 :         case T_BitmapHeapScanState:
    1456          181 :             if (planstate->plan->parallel_aware)
    1457          180 :                 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
    1458              :                                                pwcxt);
    1459              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1460          181 :             ExecBitmapHeapInstrumentInitWorker((BitmapHeapScanState *) planstate,
    1461              :                                                pwcxt);
    1462          181 :             break;
    1463          524 :         case T_HashJoinState:
    1464          524 :             if (planstate->plan->parallel_aware)
    1465          212 :                 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
    1466              :                                              pwcxt);
    1467          524 :             break;
    1468          524 :         case T_HashState:
    1469              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1470          524 :             ExecHashInitializeWorker((HashState *) planstate, pwcxt);
    1471          524 :             break;
    1472          499 :         case T_SortState:
    1473              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1474          499 :             ExecSortInitializeWorker((SortState *) planstate, pwcxt);
    1475          499 :             break;
    1476            0 :         case T_IncrementalSortState:
    1477              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1478            0 :             ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
    1479              :                                                 pwcxt);
    1480            0 :             break;
    1481         1112 :         case T_AggState:
    1482              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1483         1112 :             ExecAggInitializeWorker((AggState *) planstate, pwcxt);
    1484         1112 :             break;
    1485            8 :         case T_MemoizeState:
    1486              :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1487            8 :             ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
    1488            8 :             break;
    1489          430 :         default:
    1490          430 :             break;
    1491              :     }
    1492              : 
    1493         8156 :     return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
    1494              :                                  pwcxt);
    1495              : }
    1496              : 
    1497              : /*
    1498              :  * Main entrypoint for parallel query worker processes.
    1499              :  *
    1500              :  * We reach this function from ParallelWorkerMain, so the setup necessary to
    1501              :  * create a sensible parallel environment has already been done;
    1502              :  * ParallelWorkerMain worries about stuff like the transaction state, combo
    1503              :  * CID mappings, and GUC values, so we don't need to deal with any of that
    1504              :  * here.
    1505              :  *
    1506              :  * Our job is to deal with concerns specific to the executor.  The parallel
    1507              :  * group leader will have stored a serialized PlannedStmt, and it's our job
    1508              :  * to execute that plan and write the resulting tuples to the appropriate
    1509              :  * tuple queue.  Various bits of supporting information that we need in order
    1510              :  * to do this are also stored in the dsm_segment and can be accessed through
    1511              :  * the shm_toc.
    1512              :  */
    1513              : void
    1514         1813 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    1515              : {
    1516              :     FixedParallelExecutorState *fpes;
    1517              :     BufferUsage *buffer_usage;
    1518              :     WalUsage   *wal_usage;
    1519              :     DestReceiver *receiver;
    1520              :     QueryDesc  *queryDesc;
    1521              :     SharedExecutorInstrumentation *instrumentation;
    1522              :     SharedJitInstrumentation *jit_instrumentation;
    1523         1813 :     int         instrument_options = 0;
    1524              :     void       *area_space;
    1525              :     dsa_area   *area;
    1526              :     ParallelWorkerContext pwcxt;
    1527              : 
    1528              :     /* Get fixed-size state. */
    1529         1813 :     fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
    1530              : 
    1531              :     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    1532         1813 :     receiver = ExecParallelGetReceiver(seg, toc);
    1533         1813 :     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
    1534         1813 :     if (instrumentation != NULL)
    1535          483 :         instrument_options = instrumentation->instrument_options;
    1536         1813 :     jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
    1537              :                                          true);
    1538         1813 :     queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
    1539              : 
    1540              :     /* Setting debug_query_string for individual workers */
    1541         1813 :     debug_query_string = queryDesc->sourceText;
    1542              : 
    1543              :     /* Report workers' query for monitoring purposes */
    1544         1813 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1545              : 
    1546              :     /* Attach to the dynamic shared memory area. */
    1547         1813 :     area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
    1548         1813 :     area = dsa_attach_in_place(area_space, seg);
    1549              : 
    1550              :     /* Start up the executor */
    1551         1813 :     queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
    1552         1813 :     ExecutorStart(queryDesc, fpes->eflags);
    1553              : 
    1554              :     /* Special executor initialization steps for parallel workers */
    1555         1813 :     queryDesc->planstate->state->es_query_dsa = area;
    1556         1813 :     if (DsaPointerIsValid(fpes->param_exec))
    1557              :     {
    1558              :         char       *paramexec_space;
    1559              : 
    1560           48 :         paramexec_space = dsa_get_address(area, fpes->param_exec);
    1561           48 :         RestoreParamExecParams(paramexec_space, queryDesc->estate);
    1562              :     }
    1563         1813 :     pwcxt.toc = toc;
    1564         1813 :     pwcxt.seg = seg;
    1565         1813 :     ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
    1566              : 
    1567              :     /* Pass down any tuple bound */
    1568         1813 :     ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
    1569              : 
    1570              :     /*
    1571              :      * Prepare to track buffer/WAL usage during query execution.
    1572              :      *
    1573              :      * We do this after starting up the executor to match what happens in the
    1574              :      * leader, which also doesn't count buffer accesses and WAL activity that
    1575              :      * occur during executor startup.
    1576              :      */
    1577         1813 :     InstrStartParallelQuery();
    1578              : 
    1579              :     /*
    1580              :      * Run the plan.  If we specified a tuple bound, be careful not to demand
    1581              :      * more tuples than that.
    1582              :      */
    1583         1813 :     ExecutorRun(queryDesc,
    1584              :                 ForwardScanDirection,
    1585         1813 :                 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
    1586              : 
    1587              :     /* Shut down the executor */
    1588         1805 :     ExecutorFinish(queryDesc);
    1589              : 
    1590              :     /* Report buffer/WAL usage during parallel execution. */
    1591         1805 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
    1592         1805 :     wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
    1593         1805 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1594         1805 :                           &wal_usage[ParallelWorkerNumber]);
    1595              : 
    1596              :     /* Report instrumentation data if any instrumentation options are set. */
    1597         1805 :     if (instrumentation != NULL)
    1598          483 :         ExecParallelReportInstrumentation(queryDesc->planstate,
    1599              :                                           instrumentation);
    1600              : 
    1601              :     /* Report JIT instrumentation data if any */
    1602         1805 :     if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
    1603              :     {
    1604              :         Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
    1605            0 :         jit_instrumentation->jit_instr[ParallelWorkerNumber] =
    1606            0 :             queryDesc->estate->es_jit->instr;
    1607              :     }
    1608              : 
    1609              :     /* Must do this after capturing instrumentation. */
    1610         1805 :     ExecutorEnd(queryDesc);
    1611              : 
    1612              :     /* Cleanup. */
    1613         1805 :     dsa_detach(area);
    1614         1805 :     FreeQueryDesc(queryDesc);
    1615         1805 :     receiver->rDestroy(receiver);
    1616         1805 : }
        

Generated by: LCOV version 2.0-1