LCOV - code coverage report
Current view: top level - src/backend/executor - execParallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 522 589 88.6 %
Date: 2021-12-03 04:09:03 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execParallel.c
       4             :  *    Support routines for parallel execution.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * This file contains routines that are intended to support setting up,
      10             :  * using, and tearing down a ParallelContext from within the PostgreSQL
      11             :  * executor.  The ParallelContext machinery will handle starting the
      12             :  * workers and ensuring that their state generally matches that of the
      13             :  * leader; see src/backend/access/transam/README.parallel for details.
      14             :  * However, we must save and restore relevant executor state, such as
      15             :  * any ParamListInfo associated with the query, buffer/WAL usage info, and
      16             :  * the actual plan to be passed down to the worker.
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *    src/backend/executor/execParallel.c
      20             :  *
      21             :  *-------------------------------------------------------------------------
      22             :  */
      23             : 
      24             : #include "postgres.h"
      25             : 
      26             : #include "executor/execParallel.h"
      27             : #include "executor/executor.h"
      28             : #include "executor/nodeAgg.h"
      29             : #include "executor/nodeAppend.h"
      30             : #include "executor/nodeBitmapHeapscan.h"
      31             : #include "executor/nodeCustom.h"
      32             : #include "executor/nodeForeignscan.h"
      33             : #include "executor/nodeHash.h"
      34             : #include "executor/nodeHashjoin.h"
      35             : #include "executor/nodeIncrementalSort.h"
      36             : #include "executor/nodeIndexonlyscan.h"
      37             : #include "executor/nodeIndexscan.h"
      38             : #include "executor/nodeMemoize.h"
      39             : #include "executor/nodeSeqscan.h"
      40             : #include "executor/nodeSort.h"
      41             : #include "executor/nodeSubplan.h"
      42             : #include "executor/tqueue.h"
      43             : #include "jit/jit.h"
      44             : #include "nodes/nodeFuncs.h"
      45             : #include "pgstat.h"
      46             : #include "storage/spin.h"
      47             : #include "tcop/tcopprot.h"
      48             : #include "utils/datum.h"
      49             : #include "utils/dsa.h"
      50             : #include "utils/lsyscache.h"
      51             : #include "utils/memutils.h"
      52             : #include "utils/snapmgr.h"
      53             : 
      54             : /*
      55             :  * Magic numbers for parallel executor communication.  We use constants
      56             :  * greater than any 32-bit integer here so that values < 2^32 can be used
      57             :  * by individual parallel nodes to store their own state.
      58             :  */
      59             : #define PARALLEL_KEY_EXECUTOR_FIXED     UINT64CONST(0xE000000000000001)
      60             : #define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
      61             : #define PARALLEL_KEY_PARAMLISTINFO      UINT64CONST(0xE000000000000003)
      62             : #define PARALLEL_KEY_BUFFER_USAGE       UINT64CONST(0xE000000000000004)
      63             : #define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
      64             : #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
      65             : #define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
      66             : #define PARALLEL_KEY_QUERY_TEXT     UINT64CONST(0xE000000000000008)
      67             : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
      68             : #define PARALLEL_KEY_WAL_USAGE          UINT64CONST(0xE00000000000000A)
      69             : 
      70             : #define PARALLEL_TUPLE_QUEUE_SIZE       65536
      71             : 
      72             : /*
      73             :  * Fixed-size random stuff that we need to pass to parallel workers.
      74             :  */
      75             : typedef struct FixedParallelExecutorState
      76             : {
      77             :     int64       tuples_needed;  /* tuple bound, see ExecSetTupleBound */
      78             :     dsa_pointer param_exec;
      79             :     int         eflags;
      80             :     int         jit_flags;
      81             : } FixedParallelExecutorState;
      82             : 
      83             : /*
      84             :  * DSM structure for accumulating per-PlanState instrumentation.
      85             :  *
      86             :  * instrument_options: Same meaning here as in instrument.c.
      87             :  *
      88             :  * instrument_offset: Offset, relative to the start of this structure,
      89             :  * of the first Instrumentation object.  This will depend on the length of
      90             :  * the plan_node_id array.
      91             :  *
      92             :  * num_workers: Number of workers.
      93             :  *
      94             :  * num_plan_nodes: Number of plan nodes.
      95             :  *
      96             :  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
      97             :  * from parallel workers.  The length of this array is given by num_plan_nodes.
      98             :  */
      99             : struct SharedExecutorInstrumentation
     100             : {
     101             :     int         instrument_options;
     102             :     int         instrument_offset;
     103             :     int         num_workers;
     104             :     int         num_plan_nodes;
     105             :     int         plan_node_id[FLEXIBLE_ARRAY_MEMBER];
     106             :     /* array of num_plan_nodes * num_workers Instrumentation objects follows */
     107             : };
     108             : #define GetInstrumentationArray(sei) \
     109             :     (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
     110             :      (Instrumentation *) (((char *) sei) + sei->instrument_offset))
     111             : 
     112             : /* Context object for ExecParallelEstimate. */
     113             : typedef struct ExecParallelEstimateContext
     114             : {
     115             :     ParallelContext *pcxt;
     116             :     int         nnodes;
     117             : } ExecParallelEstimateContext;
     118             : 
     119             : /* Context object for ExecParallelInitializeDSM. */
     120             : typedef struct ExecParallelInitializeDSMContext
     121             : {
     122             :     ParallelContext *pcxt;
     123             :     SharedExecutorInstrumentation *instrumentation;
     124             :     int         nnodes;
     125             : } ExecParallelInitializeDSMContext;
     126             : 
     127             : /* Helper functions that run in the parallel leader. */
     128             : static char *ExecSerializePlan(Plan *plan, EState *estate);
     129             : static bool ExecParallelEstimate(PlanState *node,
     130             :                                  ExecParallelEstimateContext *e);
     131             : static bool ExecParallelInitializeDSM(PlanState *node,
     132             :                                       ExecParallelInitializeDSMContext *d);
     133             : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
     134             :                                                     bool reinitialize);
     135             : static bool ExecParallelReInitializeDSM(PlanState *planstate,
     136             :                                         ParallelContext *pcxt);
     137             : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     138             :                                                 SharedExecutorInstrumentation *instrumentation);
     139             : 
     140             : /* Helper function that runs in the parallel worker. */
     141             : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
     142             : 
     143             : /*
     144             :  * Create a serialized representation of the plan to be sent to each worker.
     145             :  */
     146             : static char *
     147         410 : ExecSerializePlan(Plan *plan, EState *estate)
     148             : {
     149             :     PlannedStmt *pstmt;
     150             :     ListCell   *lc;
     151             : 
     152             :     /* We can't scribble on the original plan, so make a copy. */
     153         410 :     plan = copyObject(plan);
     154             : 
     155             :     /*
     156             :      * The worker will start its own copy of the executor, and that copy will
     157             :      * insert a junk filter if the toplevel node has any resjunk entries. We
     158             :      * don't want that to happen, because while resjunk columns shouldn't be
     159             :      * sent back to the user, here the tuples are coming back to another
     160             :      * backend which may very well need them.  So mutate the target list
     161             :      * accordingly.  This is sort of a hack; there might be better ways to do
     162             :      * this...
     163             :      */
     164        1122 :     foreach(lc, plan->targetlist)
     165             :     {
     166         712 :         TargetEntry *tle = lfirst_node(TargetEntry, lc);
     167             : 
     168         712 :         tle->resjunk = false;
     169             :     }
     170             : 
     171             :     /*
     172             :      * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
     173             :      * for our purposes, but the worker will need at least a minimal
     174             :      * PlannedStmt to start the executor.
     175             :      */
     176         410 :     pstmt = makeNode(PlannedStmt);
     177         410 :     pstmt->commandType = CMD_SELECT;
     178         410 :     pstmt->queryId = pgstat_get_my_query_id();
     179         410 :     pstmt->hasReturning = false;
     180         410 :     pstmt->hasModifyingCTE = false;
     181         410 :     pstmt->canSetTag = true;
     182         410 :     pstmt->transientPlan = false;
     183         410 :     pstmt->dependsOnRole = false;
     184         410 :     pstmt->parallelModeNeeded = false;
     185         410 :     pstmt->planTree = plan;
     186         410 :     pstmt->rtable = estate->es_range_table;
     187         410 :     pstmt->resultRelations = NIL;
     188         410 :     pstmt->appendRelations = NIL;
     189             : 
     190             :     /*
     191             :      * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
     192             :      * for unsafe ones (so that the list indexes of the safe ones are
     193             :      * preserved).  This positively ensures that the worker won't try to run,
     194             :      * or even do ExecInitNode on, an unsafe subplan.  That's important to
     195             :      * protect, eg, non-parallel-aware FDWs from getting into trouble.
     196             :      */
     197         410 :     pstmt->subplans = NIL;
     198         450 :     foreach(lc, estate->es_plannedstmt->subplans)
     199             :     {
     200          40 :         Plan       *subplan = (Plan *) lfirst(lc);
     201             : 
     202          40 :         if (subplan && !subplan->parallel_safe)
     203          36 :             subplan = NULL;
     204          40 :         pstmt->subplans = lappend(pstmt->subplans, subplan);
     205             :     }
     206             : 
     207         410 :     pstmt->rewindPlanIDs = NULL;
     208         410 :     pstmt->rowMarks = NIL;
     209         410 :     pstmt->relationOids = NIL;
     210         410 :     pstmt->invalItems = NIL; /* workers can't replan anyway... */
     211         410 :     pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
     212         410 :     pstmt->utilityStmt = NULL;
     213         410 :     pstmt->stmt_location = -1;
     214         410 :     pstmt->stmt_len = -1;
     215             : 
     216             :     /* Return serialized copy of our dummy PlannedStmt. */
     217         410 :     return nodeToString(pstmt);
     218             : }
     219             : 
     220             : /*
     221             :  * Parallel-aware plan nodes (and occasionally others) may need some state
     222             :  * which is shared across all parallel workers.  Before we size the DSM, give
     223             :  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
     224             :  * &pcxt->estimator.
     225             :  *
     226             :  * While we're at it, count the number of PlanState nodes in the tree, so
     227             :  * we know how many Instrumentation structures we need.
     228             :  */
     229             : static bool
     230        1814 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
     231             : {
     232        1814 :     if (planstate == NULL)
     233           0 :         return false;
     234             : 
     235             :     /* Count this node. */
     236        1814 :     e->nnodes++;
     237             : 
     238        1814 :     switch (nodeTag(planstate))
     239             :     {
     240         716 :         case T_SeqScanState:
     241         716 :             if (planstate->plan->parallel_aware)
     242         556 :                 ExecSeqScanEstimate((SeqScanState *) planstate,
     243             :                                     e->pcxt);
     244         716 :             break;
     245         192 :         case T_IndexScanState:
     246         192 :             if (planstate->plan->parallel_aware)
     247           8 :                 ExecIndexScanEstimate((IndexScanState *) planstate,
     248             :                                       e->pcxt);
     249         192 :             break;
     250          32 :         case T_IndexOnlyScanState:
     251          32 :             if (planstate->plan->parallel_aware)
     252          24 :                 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
     253             :                                           e->pcxt);
     254          32 :             break;
     255           0 :         case T_ForeignScanState:
     256           0 :             if (planstate->plan->parallel_aware)
     257           0 :                 ExecForeignScanEstimate((ForeignScanState *) planstate,
     258             :                                         e->pcxt);
     259           0 :             break;
     260         120 :         case T_AppendState:
     261         120 :             if (planstate->plan->parallel_aware)
     262          88 :                 ExecAppendEstimate((AppendState *) planstate,
     263             :                                    e->pcxt);
     264         120 :             break;
     265           0 :         case T_CustomScanState:
     266           0 :             if (planstate->plan->parallel_aware)
     267           0 :                 ExecCustomScanEstimate((CustomScanState *) planstate,
     268             :                                        e->pcxt);
     269           0 :             break;
     270          14 :         case T_BitmapHeapScanState:
     271          14 :             if (planstate->plan->parallel_aware)
     272          12 :                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
     273             :                                        e->pcxt);
     274          14 :             break;
     275         112 :         case T_HashJoinState:
     276         112 :             if (planstate->plan->parallel_aware)
     277          64 :                 ExecHashJoinEstimate((HashJoinState *) planstate,
     278             :                                      e->pcxt);
     279         112 :             break;
     280         112 :         case T_HashState:
     281             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     282         112 :             ExecHashEstimate((HashState *) planstate, e->pcxt);
     283         112 :             break;
     284          94 :         case T_SortState:
     285             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     286          94 :             ExecSortEstimate((SortState *) planstate, e->pcxt);
     287          94 :             break;
     288           0 :         case T_IncrementalSortState:
     289             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     290           0 :             ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
     291           0 :             break;
     292         352 :         case T_AggState:
     293             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     294         352 :             ExecAggEstimate((AggState *) planstate, e->pcxt);
     295         352 :             break;
     296           4 :         case T_MemoizeState:
     297             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     298           4 :             ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
     299           4 :             break;
     300          66 :         default:
     301          66 :             break;
     302             :     }
     303             : 
     304        1814 :     return planstate_tree_walker(planstate, ExecParallelEstimate, e);
     305             : }
     306             : 
     307             : /*
     308             :  * Estimate the amount of space required to serialize the indicated parameters.
     309             :  */
     310             : static Size
     311          20 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
     312             : {
     313             :     int         paramid;
     314          20 :     Size        sz = sizeof(int);
     315             : 
     316          20 :     paramid = -1;
     317          44 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     318             :     {
     319             :         Oid         typeOid;
     320             :         int16       typLen;
     321             :         bool        typByVal;
     322             :         ParamExecData *prm;
     323             : 
     324          24 :         prm = &(estate->es_param_exec_vals[paramid]);
     325          24 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     326             :                                paramid);
     327             : 
     328          24 :         sz = add_size(sz, sizeof(int)); /* space for paramid */
     329             : 
     330             :         /* space for datum/isnull */
     331          24 :         if (OidIsValid(typeOid))
     332          24 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     333             :         else
     334             :         {
     335             :             /* If no type OID, assume by-value, like copyParamList does. */
     336           0 :             typLen = sizeof(Datum);
     337           0 :             typByVal = true;
     338             :         }
     339          24 :         sz = add_size(sz,
     340          24 :                       datumEstimateSpace(prm->value, prm->isnull,
     341             :                                          typByVal, typLen));
     342             :     }
     343          20 :     return sz;
     344             : }
     345             : 
     346             : /*
     347             :  * Serialize specified PARAM_EXEC parameters.
     348             :  *
     349             :  * We write the number of parameters first, as a 4-byte integer, and then
     350             :  * write details for each parameter in turn.  The details for each parameter
     351             :  * consist of a 4-byte paramid (location of param in execution time internal
     352             :  * parameter array) and then the datum as serialized by datumSerialize().
     353             :  */
     354             : static dsa_pointer
     355          20 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
     356             : {
     357             :     Size        size;
     358             :     int         nparams;
     359             :     int         paramid;
     360             :     ParamExecData *prm;
     361             :     dsa_pointer handle;
     362             :     char       *start_address;
     363             : 
     364             :     /* Allocate enough space for the current parameter values. */
     365          20 :     size = EstimateParamExecSpace(estate, params);
     366          20 :     handle = dsa_allocate(area, size);
     367          20 :     start_address = dsa_get_address(area, handle);
     368             : 
     369             :     /* First write the number of parameters as a 4-byte integer. */
     370          20 :     nparams = bms_num_members(params);
     371          20 :     memcpy(start_address, &nparams, sizeof(int));
     372          20 :     start_address += sizeof(int);
     373             : 
     374             :     /* Write details for each parameter in turn. */
     375          20 :     paramid = -1;
     376          44 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     377             :     {
     378             :         Oid         typeOid;
     379             :         int16       typLen;
     380             :         bool        typByVal;
     381             : 
     382          24 :         prm = &(estate->es_param_exec_vals[paramid]);
     383          24 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     384             :                                paramid);
     385             : 
     386             :         /* Write paramid. */
     387          24 :         memcpy(start_address, &paramid, sizeof(int));
     388          24 :         start_address += sizeof(int);
     389             : 
     390             :         /* Write datum/isnull */
     391          24 :         if (OidIsValid(typeOid))
     392          24 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     393             :         else
     394             :         {
     395             :             /* If no type OID, assume by-value, like copyParamList does. */
     396           0 :             typLen = sizeof(Datum);
     397           0 :             typByVal = true;
     398             :         }
     399          24 :         datumSerialize(prm->value, prm->isnull, typByVal, typLen,
     400             :                        &start_address);
     401             :     }
     402             : 
     403          20 :     return handle;
     404             : }
     405             : 
     406             : /*
     407             :  * Restore specified PARAM_EXEC parameters.
     408             :  */
     409             : static void
     410          48 : RestoreParamExecParams(char *start_address, EState *estate)
     411             : {
     412             :     int         nparams;
     413             :     int         i;
     414             :     int         paramid;
     415             : 
     416          48 :     memcpy(&nparams, start_address, sizeof(int));
     417          48 :     start_address += sizeof(int);
     418             : 
     419         104 :     for (i = 0; i < nparams; i++)
     420             :     {
     421             :         ParamExecData *prm;
     422             : 
     423             :         /* Read paramid */
     424          56 :         memcpy(&paramid, start_address, sizeof(int));
     425          56 :         start_address += sizeof(int);
     426          56 :         prm = &(estate->es_param_exec_vals[paramid]);
     427             : 
     428             :         /* Read datum/isnull. */
     429          56 :         prm->value = datumRestore(&start_address, &prm->isnull);
     430          56 :         prm->execPlan = NULL;
     431             :     }
     432          48 : }
     433             : 
     434             : /*
     435             :  * Initialize the dynamic shared memory segment that will be used to control
     436             :  * parallel execution.
     437             :  */
     438             : static bool
     439        1814 : ExecParallelInitializeDSM(PlanState *planstate,
     440             :                           ExecParallelInitializeDSMContext *d)
     441             : {
     442        1814 :     if (planstate == NULL)
     443           0 :         return false;
     444             : 
     445             :     /* If instrumentation is enabled, initialize slot for this node. */
     446        1814 :     if (d->instrumentation != NULL)
     447         676 :         d->instrumentation->plan_node_id[d->nnodes] =
     448         676 :             planstate->plan->plan_node_id;
     449             : 
     450             :     /* Count this node. */
     451        1814 :     d->nnodes++;
     452             : 
     453             :     /*
     454             :      * Call initializers for DSM-using plan nodes.
     455             :      *
     456             :      * Most plan nodes won't do anything here, but plan nodes that allocated
     457             :      * DSM may need to initialize shared state in the DSM before parallel
     458             :      * workers are launched.  They can allocate the space they previously
     459             :      * estimated using shm_toc_allocate, and add the keys they previously
     460             :      * estimated using shm_toc_insert, in each case targeting pcxt->toc.
     461             :      */
     462        1814 :     switch (nodeTag(planstate))
     463             :     {
     464         716 :         case T_SeqScanState:
     465         716 :             if (planstate->plan->parallel_aware)
     466         556 :                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
     467             :                                          d->pcxt);
     468         716 :             break;
     469         192 :         case T_IndexScanState:
     470         192 :             if (planstate->plan->parallel_aware)
     471           8 :                 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
     472             :                                            d->pcxt);
     473         192 :             break;
     474          32 :         case T_IndexOnlyScanState:
     475          32 :             if (planstate->plan->parallel_aware)
     476          24 :                 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
     477             :                                                d->pcxt);
     478          32 :             break;
     479           0 :         case T_ForeignScanState:
     480           0 :             if (planstate->plan->parallel_aware)
     481           0 :                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
     482             :                                              d->pcxt);
     483           0 :             break;
     484         120 :         case T_AppendState:
     485         120 :             if (planstate->plan->parallel_aware)
     486          88 :                 ExecAppendInitializeDSM((AppendState *) planstate,
     487             :                                         d->pcxt);
     488         120 :             break;
     489           0 :         case T_CustomScanState:
     490           0 :             if (planstate->plan->parallel_aware)
     491           0 :                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
     492             :                                             d->pcxt);
     493           0 :             break;
     494          14 :         case T_BitmapHeapScanState:
     495          14 :             if (planstate->plan->parallel_aware)
     496          12 :                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
     497             :                                             d->pcxt);
     498          14 :             break;
     499         112 :         case T_HashJoinState:
     500         112 :             if (planstate->plan->parallel_aware)
     501          64 :                 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
     502             :                                           d->pcxt);
     503         112 :             break;
     504         112 :         case T_HashState:
     505             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     506         112 :             ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
     507         112 :             break;
     508          94 :         case T_SortState:
     509             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     510          94 :             ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
     511          94 :             break;
     512           0 :         case T_IncrementalSortState:
     513             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     514           0 :             ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
     515           0 :             break;
     516         352 :         case T_AggState:
     517             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     518         352 :             ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
     519         352 :             break;
     520           4 :         case T_MemoizeState:
     521             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     522           4 :             ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
     523           4 :             break;
     524          66 :         default:
     525          66 :             break;
     526             :     }
     527             : 
     528        1814 :     return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
     529             : }
     530             : 
     531             : /*
     532             :  * It sets up the response queues for backend workers to return tuples
     533             :  * to the main backend and start the workers.
     534             :  */
     535             : static shm_mq_handle **
     536         582 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
     537             : {
     538             :     shm_mq_handle **responseq;
     539             :     char       *tqueuespace;
     540             :     int         i;
     541             : 
     542             :     /* Skip this if no workers. */
     543         582 :     if (pcxt->nworkers == 0)
     544           0 :         return NULL;
     545             : 
     546             :     /* Allocate memory for shared memory queue handles. */
     547             :     responseq = (shm_mq_handle **)
     548         582 :         palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
     549             : 
     550             :     /*
     551             :      * If not reinitializing, allocate space from the DSM for the queues;
     552             :      * otherwise, find the already allocated space.
     553             :      */
     554         582 :     if (!reinitialize)
     555             :         tqueuespace =
     556         410 :             shm_toc_allocate(pcxt->toc,
     557             :                              mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
     558         410 :                                       pcxt->nworkers));
     559             :     else
     560         172 :         tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     561             : 
     562             :     /* Create the queues, and become the receiver for each. */
     563        2204 :     for (i = 0; i < pcxt->nworkers; ++i)
     564             :     {
     565             :         shm_mq     *mq;
     566             : 
     567        1622 :         mq = shm_mq_create(tqueuespace +
     568        1622 :                            ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
     569             :                            (Size) PARALLEL_TUPLE_QUEUE_SIZE);
     570             : 
     571        1622 :         shm_mq_set_receiver(mq, MyProc);
     572        1622 :         responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
     573             :     }
     574             : 
     575             :     /* Add array of queues to shm_toc, so others can find it. */
     576         582 :     if (!reinitialize)
     577         410 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
     578             : 
     579             :     /* Return array of handles. */
     580         582 :     return responseq;
     581             : }
     582             : 
     583             : /*
     584             :  * Sets up the required infrastructure for backend workers to perform
     585             :  * execution and return results to the main backend.
     586             :  */
     587             : ParallelExecutorInfo *
     588         410 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
     589             :                      Bitmapset *sendParams, int nworkers,
     590             :                      int64 tuples_needed)
     591             : {
     592             :     ParallelExecutorInfo *pei;
     593             :     ParallelContext *pcxt;
     594             :     ExecParallelEstimateContext e;
     595             :     ExecParallelInitializeDSMContext d;
     596             :     FixedParallelExecutorState *fpes;
     597             :     char       *pstmt_data;
     598             :     char       *pstmt_space;
     599             :     char       *paramlistinfo_space;
     600             :     BufferUsage *bufusage_space;
     601             :     WalUsage   *walusage_space;
     602         410 :     SharedExecutorInstrumentation *instrumentation = NULL;
     603         410 :     SharedJitInstrumentation *jit_instrumentation = NULL;
     604             :     int         pstmt_len;
     605             :     int         paramlistinfo_len;
     606         410 :     int         instrumentation_len = 0;
     607         410 :     int         jit_instrumentation_len = 0;
     608         410 :     int         instrument_offset = 0;
     609         410 :     Size        dsa_minsize = dsa_minimum_size();
     610             :     char       *query_string;
     611             :     int         query_len;
     612             : 
     613             :     /*
     614             :      * Force any initplan outputs that we're going to pass to workers to be
     615             :      * evaluated, if they weren't already.
     616             :      *
     617             :      * For simplicity, we use the EState's per-output-tuple ExprContext here.
     618             :      * That risks intra-query memory leakage, since we might pass through here
     619             :      * many times before that ExprContext gets reset; but ExecSetParamPlan
     620             :      * doesn't normally leak any memory in the context (see its comments), so
     621             :      * it doesn't seem worth complicating this function's API to pass it a
     622             :      * shorter-lived ExprContext.  This might need to change someday.
     623             :      */
     624         410 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     625             : 
     626             :     /* Allocate object for return value. */
     627         410 :     pei = palloc0(sizeof(ParallelExecutorInfo));
     628         410 :     pei->finished = false;
     629         410 :     pei->planstate = planstate;
     630             : 
     631             :     /* Fix up and serialize plan to be sent to workers. */
     632         410 :     pstmt_data = ExecSerializePlan(planstate->plan, estate);
     633             : 
     634             :     /* Create a parallel context. */
     635         410 :     pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
     636         410 :     pei->pcxt = pcxt;
     637             : 
     638             :     /*
     639             :      * Before telling the parallel context to create a dynamic shared memory
     640             :      * segment, we need to figure out how big it should be.  Estimate space
     641             :      * for the various things we need to store.
     642             :      */
     643             : 
     644             :     /* Estimate space for fixed-size state. */
     645         410 :     shm_toc_estimate_chunk(&pcxt->estimator,
     646             :                            sizeof(FixedParallelExecutorState));
     647         410 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     648             : 
     649             :     /* Estimate space for query text. */
     650         410 :     query_len = strlen(estate->es_sourceText);
     651         410 :     shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
     652         410 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     653             : 
     654             :     /* Estimate space for serialized PlannedStmt. */
     655         410 :     pstmt_len = strlen(pstmt_data) + 1;
     656         410 :     shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
     657         410 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     658             : 
     659             :     /* Estimate space for serialized ParamListInfo. */
     660         410 :     paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
     661         410 :     shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
     662         410 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     663             : 
     664             :     /*
     665             :      * Estimate space for BufferUsage.
     666             :      *
     667             :      * If EXPLAIN is not in use and there are no extensions loaded that care,
     668             :      * we could skip this.  But we have no way of knowing whether anyone's
     669             :      * looking at pgBufferUsage, so do it unconditionally.
     670             :      */
     671         410 :     shm_toc_estimate_chunk(&pcxt->estimator,
     672             :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     673         410 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     674             : 
     675             :     /*
     676             :      * Same thing for WalUsage.
     677             :      */
     678         410 :     shm_toc_estimate_chunk(&pcxt->estimator,
     679             :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     680         410 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     681             : 
     682             :     /* Estimate space for tuple queues. */
     683         410 :     shm_toc_estimate_chunk(&pcxt->estimator,
     684             :                            mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
     685         410 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     686             : 
     687             :     /*
     688             :      * Give parallel-aware nodes a chance to add to the estimates, and get a
     689             :      * count of how many PlanState nodes there are.
     690             :      */
     691         410 :     e.pcxt = pcxt;
     692         410 :     e.nnodes = 0;
     693         410 :     ExecParallelEstimate(planstate, &e);
     694             : 
     695             :     /* Estimate space for instrumentation, if required. */
     696         410 :     if (estate->es_instrument)
     697             :     {
     698         124 :         instrumentation_len =
     699             :             offsetof(SharedExecutorInstrumentation, plan_node_id) +
     700         124 :             sizeof(int) * e.nnodes;
     701         124 :         instrumentation_len = MAXALIGN(instrumentation_len);
     702         124 :         instrument_offset = instrumentation_len;
     703         124 :         instrumentation_len +=
     704         124 :             mul_size(sizeof(Instrumentation),
     705         124 :                      mul_size(e.nnodes, nworkers));
     706         124 :         shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
     707         124 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     708             : 
     709             :         /* Estimate space for JIT instrumentation, if required. */
     710         124 :         if (estate->es_jit_flags != PGJIT_NONE)
     711             :         {
     712          16 :             jit_instrumentation_len =
     713          16 :                 offsetof(SharedJitInstrumentation, jit_instr) +
     714             :                 sizeof(JitInstrumentation) * nworkers;
     715          16 :             shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
     716          16 :             shm_toc_estimate_keys(&pcxt->estimator, 1);
     717             :         }
     718             :     }
     719             : 
     720             :     /* Estimate space for DSA area. */
     721         410 :     shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
     722         410 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     723             : 
     724             :     /* Everyone's had a chance to ask for space, so now create the DSM. */
     725         410 :     InitializeParallelDSM(pcxt);
     726             : 
     727             :     /*
     728             :      * OK, now we have a dynamic shared memory segment, and it should be big
     729             :      * enough to store all of the data we estimated we would want to put into
     730             :      * it, plus whatever general stuff (not specifically executor-related) the
     731             :      * ParallelContext itself needs to store there.  None of the space we
     732             :      * asked for has been allocated or initialized yet, though, so do that.
     733             :      */
     734             : 
     735             :     /* Store fixed-size state. */
     736         410 :     fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
     737         410 :     fpes->tuples_needed = tuples_needed;
     738         410 :     fpes->param_exec = InvalidDsaPointer;
     739         410 :     fpes->eflags = estate->es_top_eflags;
     740         410 :     fpes->jit_flags = estate->es_jit_flags;
     741         410 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
     742             : 
     743             :     /* Store query string */
     744         410 :     query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
     745         410 :     memcpy(query_string, estate->es_sourceText, query_len + 1);
     746         410 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
     747             : 
     748             :     /* Store serialized PlannedStmt. */
     749         410 :     pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
     750         410 :     memcpy(pstmt_space, pstmt_data, pstmt_len);
     751         410 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
     752             : 
     753             :     /* Store serialized ParamListInfo. */
     754         410 :     paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
     755         410 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
     756         410 :     SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
     757             : 
     758             :     /* Allocate space for each worker's BufferUsage; no need to initialize. */
     759         410 :     bufusage_space = shm_toc_allocate(pcxt->toc,
     760         410 :                                       mul_size(sizeof(BufferUsage), pcxt->nworkers));
     761         410 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
     762         410 :     pei->buffer_usage = bufusage_space;
     763             : 
     764             :     /* Same for WalUsage. */
     765         410 :     walusage_space = shm_toc_allocate(pcxt->toc,
     766         410 :                                       mul_size(sizeof(WalUsage), pcxt->nworkers));
     767         410 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
     768         410 :     pei->wal_usage = walusage_space;
     769             : 
     770             :     /* Set up the tuple queues that the workers will write into. */
     771         410 :     pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     772             : 
     773             :     /* We don't need the TupleQueueReaders yet, though. */
     774         410 :     pei->reader = NULL;
     775             : 
     776             :     /*
     777             :      * If instrumentation options were supplied, allocate space for the data.
     778             :      * It only gets partially initialized here; the rest happens during
     779             :      * ExecParallelInitializeDSM.
     780             :      */
     781         410 :     if (estate->es_instrument)
     782             :     {
     783             :         Instrumentation *instrument;
     784             :         int         i;
     785             : 
     786         124 :         instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
     787         124 :         instrumentation->instrument_options = estate->es_instrument;
     788         124 :         instrumentation->instrument_offset = instrument_offset;
     789         124 :         instrumentation->num_workers = nworkers;
     790         124 :         instrumentation->num_plan_nodes = e.nnodes;
     791         124 :         instrument = GetInstrumentationArray(instrumentation);
     792        1224 :         for (i = 0; i < nworkers * e.nnodes; ++i)
     793        1100 :             InstrInit(&instrument[i], estate->es_instrument);
     794         124 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
     795             :                        instrumentation);
     796         124 :         pei->instrumentation = instrumentation;
     797             : 
     798         124 :         if (estate->es_jit_flags != PGJIT_NONE)
     799             :         {
     800          16 :             jit_instrumentation = shm_toc_allocate(pcxt->toc,
     801             :                                                    jit_instrumentation_len);
     802          16 :             jit_instrumentation->num_workers = nworkers;
     803          16 :             memset(jit_instrumentation->jit_instr, 0,
     804             :                    sizeof(JitInstrumentation) * nworkers);
     805          16 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
     806             :                            jit_instrumentation);
     807          16 :             pei->jit_instrumentation = jit_instrumentation;
     808             :         }
     809             :     }
     810             : 
     811             :     /*
     812             :      * Create a DSA area that can be used by the leader and all workers.
     813             :      * (However, if we failed to create a DSM and are using private memory
     814             :      * instead, then skip this.)
     815             :      */
     816         410 :     if (pcxt->seg != NULL)
     817             :     {
     818             :         char       *area_space;
     819             : 
     820         410 :         area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
     821         410 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
     822         410 :         pei->area = dsa_create_in_place(area_space, dsa_minsize,
     823             :                                         LWTRANCHE_PARALLEL_QUERY_DSA,
     824             :                                         pcxt->seg);
     825             : 
     826             :         /*
     827             :          * Serialize parameters, if any, using DSA storage.  We don't dare use
     828             :          * the main parallel query DSM for this because we might relaunch
     829             :          * workers after the values have changed (and thus the amount of
     830             :          * storage required has changed).
     831             :          */
     832         410 :         if (!bms_is_empty(sendParams))
     833             :         {
     834          20 :             pei->param_exec = SerializeParamExecParams(estate, sendParams,
     835             :                                                        pei->area);
     836          20 :             fpes->param_exec = pei->param_exec;
     837             :         }
     838             :     }
     839             : 
     840             :     /*
     841             :      * Give parallel-aware nodes a chance to initialize their shared data.
     842             :      * This also initializes the elements of instrumentation->ps_instrument,
     843             :      * if it exists.
     844             :      */
     845         410 :     d.pcxt = pcxt;
     846         410 :     d.instrumentation = instrumentation;
     847         410 :     d.nnodes = 0;
     848             : 
     849             :     /* Install our DSA area while initializing the plan. */
     850         410 :     estate->es_query_dsa = pei->area;
     851         410 :     ExecParallelInitializeDSM(planstate, &d);
     852         410 :     estate->es_query_dsa = NULL;
     853             : 
     854             :     /*
     855             :      * Make sure that the world hasn't shifted under our feet.  This could
     856             :      * probably just be an Assert(), but let's be conservative for now.
     857             :      */
     858         410 :     if (e.nnodes != d.nnodes)
     859           0 :         elog(ERROR, "inconsistent count of PlanState nodes");
     860             : 
     861             :     /* OK, we're ready to rock and roll. */
     862         410 :     return pei;
     863             : }
     864             : 
     865             : /*
     866             :  * Set up tuple queue readers to read the results of a parallel subplan.
     867             :  *
     868             :  * This is separate from ExecInitParallelPlan() because we can launch the
     869             :  * worker processes and let them start doing something before we do this.
     870             :  */
     871             : void
     872         570 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
     873             : {
     874         570 :     int         nworkers = pei->pcxt->nworkers_launched;
     875             :     int         i;
     876             : 
     877             :     Assert(pei->reader == NULL);
     878             : 
     879         570 :     if (nworkers > 0)
     880             :     {
     881         570 :         pei->reader = (TupleQueueReader **)
     882         570 :             palloc(nworkers * sizeof(TupleQueueReader *));
     883             : 
     884        2142 :         for (i = 0; i < nworkers; i++)
     885             :         {
     886        1572 :             shm_mq_set_handle(pei->tqueue[i],
     887        1572 :                               pei->pcxt->worker[i].bgwhandle);
     888        1572 :             pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
     889             :         }
     890             :     }
     891         570 : }
     892             : 
     893             : /*
     894             :  * Re-initialize the parallel executor shared memory state before launching
     895             :  * a fresh batch of workers.
     896             :  */
     897             : void
     898         172 : ExecParallelReinitialize(PlanState *planstate,
     899             :                          ParallelExecutorInfo *pei,
     900             :                          Bitmapset *sendParams)
     901             : {
     902         172 :     EState     *estate = planstate->state;
     903             :     FixedParallelExecutorState *fpes;
     904             : 
     905             :     /* Old workers must already be shut down */
     906             :     Assert(pei->finished);
     907             : 
     908             :     /*
     909             :      * Force any initplan outputs that we're going to pass to workers to be
     910             :      * evaluated, if they weren't already (see comments in
     911             :      * ExecInitParallelPlan).
     912             :      */
     913         172 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     914             : 
     915         172 :     ReinitializeParallelDSM(pei->pcxt);
     916         172 :     pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
     917         172 :     pei->reader = NULL;
     918         172 :     pei->finished = false;
     919             : 
     920         172 :     fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
     921             : 
     922             :     /* Free any serialized parameters from the last round. */
     923         172 :     if (DsaPointerIsValid(fpes->param_exec))
     924             :     {
     925           0 :         dsa_free(pei->area, fpes->param_exec);
     926           0 :         fpes->param_exec = InvalidDsaPointer;
     927             :     }
     928             : 
     929             :     /* Serialize current parameter values if required. */
     930         172 :     if (!bms_is_empty(sendParams))
     931             :     {
     932           0 :         pei->param_exec = SerializeParamExecParams(estate, sendParams,
     933             :                                                    pei->area);
     934           0 :         fpes->param_exec = pei->param_exec;
     935             :     }
     936             : 
     937             :     /* Traverse plan tree and let each child node reset associated state. */
     938         172 :     estate->es_query_dsa = pei->area;
     939         172 :     ExecParallelReInitializeDSM(planstate, pei->pcxt);
     940         172 :     estate->es_query_dsa = NULL;
     941         172 : }
     942             : 
     943             : /*
     944             :  * Traverse plan tree to reinitialize per-node dynamic shared memory state
     945             :  */
     946             : static bool
     947         444 : ExecParallelReInitializeDSM(PlanState *planstate,
     948             :                             ParallelContext *pcxt)
     949             : {
     950         444 :     if (planstate == NULL)
     951           0 :         return false;
     952             : 
     953             :     /*
     954             :      * Call reinitializers for DSM-using plan nodes.
     955             :      */
     956         444 :     switch (nodeTag(planstate))
     957             :     {
     958         184 :         case T_SeqScanState:
     959         184 :             if (planstate->plan->parallel_aware)
     960         152 :                 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
     961             :                                            pcxt);
     962         184 :             break;
     963           8 :         case T_IndexScanState:
     964           8 :             if (planstate->plan->parallel_aware)
     965           8 :                 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
     966             :                                              pcxt);
     967           8 :             break;
     968           8 :         case T_IndexOnlyScanState:
     969           8 :             if (planstate->plan->parallel_aware)
     970           8 :                 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
     971             :                                                  pcxt);
     972           8 :             break;
     973           0 :         case T_ForeignScanState:
     974           0 :             if (planstate->plan->parallel_aware)
     975           0 :                 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
     976             :                                                pcxt);
     977           0 :             break;
     978           0 :         case T_AppendState:
     979           0 :             if (planstate->plan->parallel_aware)
     980           0 :                 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
     981           0 :             break;
     982           0 :         case T_CustomScanState:
     983           0 :             if (planstate->plan->parallel_aware)
     984           0 :                 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
     985             :                                               pcxt);
     986           0 :             break;
     987          36 :         case T_BitmapHeapScanState:
     988          36 :             if (planstate->plan->parallel_aware)
     989          36 :                 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
     990             :                                               pcxt);
     991          36 :             break;
     992          64 :         case T_HashJoinState:
     993          64 :             if (planstate->plan->parallel_aware)
     994          32 :                 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
     995             :                                             pcxt);
     996          64 :             break;
     997          84 :         case T_HashState:
     998             :         case T_SortState:
     999             :         case T_IncrementalSortState:
    1000             :         case T_MemoizeState:
    1001             :             /* these nodes have DSM state, but no reinitialization is required */
    1002          84 :             break;
    1003             : 
    1004          60 :         default:
    1005          60 :             break;
    1006             :     }
    1007             : 
    1008         444 :     return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
    1009             : }
    1010             : 
    1011             : /*
    1012             :  * Copy instrumentation information about this node and its descendants from
    1013             :  * dynamic shared memory.
    1014             :  */
    1015             : static bool
    1016         676 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
    1017             :                                     SharedExecutorInstrumentation *instrumentation)
    1018             : {
    1019             :     Instrumentation *instrument;
    1020             :     int         i;
    1021             :     int         n;
    1022             :     int         ibytes;
    1023         676 :     int         plan_node_id = planstate->plan->plan_node_id;
    1024             :     MemoryContext oldcontext;
    1025             : 
    1026             :     /* Find the instrumentation for this node. */
    1027        2980 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1028        2980 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1029         676 :             break;
    1030         676 :     if (i >= instrumentation->num_plan_nodes)
    1031           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1032             : 
    1033             :     /* Accumulate the statistics from all workers. */
    1034         676 :     instrument = GetInstrumentationArray(instrumentation);
    1035         676 :     instrument += i * instrumentation->num_workers;
    1036        1776 :     for (n = 0; n < instrumentation->num_workers; ++n)
    1037        1100 :         InstrAggNode(planstate->instrument, &instrument[n]);
    1038             : 
    1039             :     /*
    1040             :      * Also store the per-worker detail.
    1041             :      *
    1042             :      * Worker instrumentation should be allocated in the same context as the
    1043             :      * regular instrumentation information, which is the per-query context.
    1044             :      * Switch into per-query memory context.
    1045             :      */
    1046         676 :     oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
    1047         676 :     ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
    1048         676 :     planstate->worker_instrument =
    1049         676 :         palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
    1050         676 :     MemoryContextSwitchTo(oldcontext);
    1051             : 
    1052         676 :     planstate->worker_instrument->num_workers = instrumentation->num_workers;
    1053         676 :     memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
    1054             : 
    1055             :     /* Perform any node-type-specific work that needs to be done. */
    1056         676 :     switch (nodeTag(planstate))
    1057             :     {
    1058           8 :         case T_SortState:
    1059           8 :             ExecSortRetrieveInstrumentation((SortState *) planstate);
    1060           8 :             break;
    1061           0 :         case T_IncrementalSortState:
    1062           0 :             ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
    1063           0 :             break;
    1064          56 :         case T_HashState:
    1065          56 :             ExecHashRetrieveInstrumentation((HashState *) planstate);
    1066          56 :             break;
    1067          72 :         case T_AggState:
    1068          72 :             ExecAggRetrieveInstrumentation((AggState *) planstate);
    1069          72 :             break;
    1070           0 :         case T_MemoizeState:
    1071           0 :             ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
    1072           0 :             break;
    1073         540 :         default:
    1074         540 :             break;
    1075             :     }
    1076             : 
    1077         676 :     return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
    1078             :                                  instrumentation);
    1079             : }
    1080             : 
    1081             : /*
    1082             :  * Add up the workers' JIT instrumentation from dynamic shared memory.
    1083             :  */
    1084             : static void
    1085          16 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
    1086             :                                        SharedJitInstrumentation *shared_jit)
    1087             : {
    1088             :     JitInstrumentation *combined;
    1089             :     int         ibytes;
    1090             : 
    1091             :     int         n;
    1092             : 
    1093             :     /*
    1094             :      * Accumulate worker JIT instrumentation into the combined JIT
    1095             :      * instrumentation, allocating it if required.
    1096             :      */
    1097          16 :     if (!planstate->state->es_jit_worker_instr)
    1098          16 :         planstate->state->es_jit_worker_instr =
    1099          16 :             MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
    1100          16 :     combined = planstate->state->es_jit_worker_instr;
    1101             : 
    1102             :     /* Accumulate all the workers' instrumentations. */
    1103          48 :     for (n = 0; n < shared_jit->num_workers; ++n)
    1104          32 :         InstrJitAgg(combined, &shared_jit->jit_instr[n]);
    1105             : 
    1106             :     /*
    1107             :      * Store the per-worker detail.
    1108             :      *
    1109             :      * Similar to ExecParallelRetrieveInstrumentation(), allocate the
    1110             :      * instrumentation in per-query context.
    1111             :      */
    1112          16 :     ibytes = offsetof(SharedJitInstrumentation, jit_instr)
    1113          16 :         + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
    1114          16 :     planstate->worker_jit_instrument =
    1115          16 :         MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
    1116             : 
    1117          16 :     memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
    1118          16 : }
    1119             : 
    1120             : /*
    1121             :  * Finish parallel execution.  We wait for parallel workers to finish, and
    1122             :  * accumulate their buffer/WAL usage.
    1123             :  */
    1124             : void
    1125        1056 : ExecParallelFinish(ParallelExecutorInfo *pei)
    1126             : {
    1127        1056 :     int         nworkers = pei->pcxt->nworkers_launched;
    1128             :     int         i;
    1129             : 
    1130             :     /* Make this be a no-op if called twice in a row. */
    1131        1056 :     if (pei->finished)
    1132         478 :         return;
    1133             : 
    1134             :     /*
    1135             :      * Detach from tuple queues ASAP, so that any still-active workers will
    1136             :      * notice that no further results are wanted.
    1137             :      */
    1138         578 :     if (pei->tqueue != NULL)
    1139             :     {
    1140        2146 :         for (i = 0; i < nworkers; i++)
    1141        1568 :             shm_mq_detach(pei->tqueue[i]);
    1142         578 :         pfree(pei->tqueue);
    1143         578 :         pei->tqueue = NULL;
    1144             :     }
    1145             : 
    1146             :     /*
    1147             :      * While we're waiting for the workers to finish, let's get rid of the
    1148             :      * tuple queue readers.  (Any other local cleanup could be done here too.)
    1149             :      */
    1150         578 :     if (pei->reader != NULL)
    1151             :     {
    1152        2134 :         for (i = 0; i < nworkers; i++)
    1153        1568 :             DestroyTupleQueueReader(pei->reader[i]);
    1154         566 :         pfree(pei->reader);
    1155         566 :         pei->reader = NULL;
    1156             :     }
    1157             : 
    1158             :     /* Now wait for the workers to finish. */
    1159         578 :     WaitForParallelWorkersToFinish(pei->pcxt);
    1160             : 
    1161             :     /*
    1162             :      * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
    1163             :      * finish, or we might get incomplete data.)
    1164             :      */
    1165        2146 :     for (i = 0; i < nworkers; i++)
    1166        1568 :         InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
    1167             : 
    1168         578 :     pei->finished = true;
    1169             : }
    1170             : 
    1171             : /*
    1172             :  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
    1173             :  * resources still exist after ExecParallelFinish.  We separate these
    1174             :  * routines because someone might want to examine the contents of the DSM
    1175             :  * after ExecParallelFinish and before calling this routine.
    1176             :  */
    1177             : void
    1178         406 : ExecParallelCleanup(ParallelExecutorInfo *pei)
    1179             : {
    1180             :     /* Accumulate instrumentation, if any. */
    1181         406 :     if (pei->instrumentation)
    1182         124 :         ExecParallelRetrieveInstrumentation(pei->planstate,
    1183             :                                             pei->instrumentation);
    1184             : 
    1185             :     /* Accumulate JIT instrumentation, if any. */
    1186         406 :     if (pei->jit_instrumentation)
    1187          16 :         ExecParallelRetrieveJitInstrumentation(pei->planstate,
    1188          16 :                                                pei->jit_instrumentation);
    1189             : 
    1190             :     /* Free any serialized parameters. */
    1191         406 :     if (DsaPointerIsValid(pei->param_exec))
    1192             :     {
    1193          20 :         dsa_free(pei->area, pei->param_exec);
    1194          20 :         pei->param_exec = InvalidDsaPointer;
    1195             :     }
    1196         406 :     if (pei->area != NULL)
    1197             :     {
    1198         406 :         dsa_detach(pei->area);
    1199         406 :         pei->area = NULL;
    1200             :     }
    1201         406 :     if (pei->pcxt != NULL)
    1202             :     {
    1203         406 :         DestroyParallelContext(pei->pcxt);
    1204         406 :         pei->pcxt = NULL;
    1205             :     }
    1206         406 :     pfree(pei);
    1207         406 : }
    1208             : 
    1209             : /*
    1210             :  * Create a DestReceiver to write tuples we produce to the shm_mq designated
    1211             :  * for that purpose.
    1212             :  */
    1213             : static DestReceiver *
    1214        1572 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
    1215             : {
    1216             :     char       *mqspace;
    1217             :     shm_mq     *mq;
    1218             : 
    1219        1572 :     mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
    1220        1572 :     mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
    1221        1572 :     mq = (shm_mq *) mqspace;
    1222        1572 :     shm_mq_set_sender(mq, MyProc);
    1223        1572 :     return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
    1224             : }
    1225             : 
    1226             : /*
    1227             :  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
    1228             :  */
    1229             : static QueryDesc *
    1230        1572 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
    1231             :                          int instrument_options)
    1232             : {
    1233             :     char       *pstmtspace;
    1234             :     char       *paramspace;
    1235             :     PlannedStmt *pstmt;
    1236             :     ParamListInfo paramLI;
    1237             :     char       *queryString;
    1238             : 
    1239             :     /* Get the query string from shared memory */
    1240        1572 :     queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
    1241             : 
    1242             :     /* Reconstruct leader-supplied PlannedStmt. */
    1243        1572 :     pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
    1244        1572 :     pstmt = (PlannedStmt *) stringToNode(pstmtspace);
    1245             : 
    1246             :     /* Reconstruct ParamListInfo. */
    1247        1572 :     paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
    1248        1572 :     paramLI = RestoreParamList(&paramspace);
    1249             : 
    1250             :     /* Create a QueryDesc for the query. */
    1251        1572 :     return CreateQueryDesc(pstmt,
    1252             :                            queryString,
    1253             :                            GetActiveSnapshot(), InvalidSnapshot,
    1254             :                            receiver, paramLI, NULL, instrument_options);
    1255             : }
    1256             : 
    1257             : /*
    1258             :  * Copy instrumentation information from this node and its descendants into
    1259             :  * dynamic shared memory, so that the parallel leader can retrieve it.
    1260             :  */
    1261             : static bool
    1262        1564 : ExecParallelReportInstrumentation(PlanState *planstate,
    1263             :                                   SharedExecutorInstrumentation *instrumentation)
    1264             : {
    1265             :     int         i;
    1266        1564 :     int         plan_node_id = planstate->plan->plan_node_id;
    1267             :     Instrumentation *instrument;
    1268             : 
    1269        1564 :     InstrEndLoop(planstate->instrument);
    1270             : 
    1271             :     /*
    1272             :      * If we shuffled the plan_node_id values in ps_instrument into sorted
    1273             :      * order, we could use binary search here.  This might matter someday if
    1274             :      * we're pushing down sufficiently large plan trees.  For now, do it the
    1275             :      * slow, dumb way.
    1276             :      */
    1277        4964 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1278        4964 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1279        1564 :             break;
    1280        1564 :     if (i >= instrumentation->num_plan_nodes)
    1281           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1282             : 
    1283             :     /*
    1284             :      * Add our statistics to the per-node, per-worker totals.  It's possible
    1285             :      * that this could happen more than once if we relaunched workers.
    1286             :      */
    1287        1564 :     instrument = GetInstrumentationArray(instrumentation);
    1288        1564 :     instrument += i * instrumentation->num_workers;
    1289             :     Assert(IsParallelWorker());
    1290             :     Assert(ParallelWorkerNumber < instrumentation->num_workers);
    1291        1564 :     InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
    1292             : 
    1293        1564 :     return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
    1294             :                                  instrumentation);
    1295             : }
    1296             : 
    1297             : /*
    1298             :  * Initialize the PlanState and its descendants with the information
    1299             :  * retrieved from shared memory.  This has to be done once the PlanState
    1300             :  * is allocated and initialized by executor; that is, after ExecutorStart().
    1301             :  */
    1302             : static bool
    1303        5134 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
    1304             : {
    1305        5134 :     if (planstate == NULL)
    1306           0 :         return false;
    1307             : 
    1308        5134 :     switch (nodeTag(planstate))
    1309             :     {
    1310        2110 :         case T_SeqScanState:
    1311        2110 :             if (planstate->plan->parallel_aware)
    1312        1690 :                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
    1313        2110 :             break;
    1314         248 :         case T_IndexScanState:
    1315         248 :             if (planstate->plan->parallel_aware)
    1316          64 :                 ExecIndexScanInitializeWorker((IndexScanState *) planstate,
    1317             :                                               pwcxt);
    1318         248 :             break;
    1319         152 :         case T_IndexOnlyScanState:
    1320         152 :             if (planstate->plan->parallel_aware)
    1321         128 :                 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
    1322             :                                                   pwcxt);
    1323         152 :             break;
    1324           0 :         case T_ForeignScanState:
    1325           0 :             if (planstate->plan->parallel_aware)
    1326           0 :                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
    1327             :                                                 pwcxt);
    1328           0 :             break;
    1329         244 :         case T_AppendState:
    1330         244 :             if (planstate->plan->parallel_aware)
    1331         204 :                 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
    1332         244 :             break;
    1333           0 :         case T_CustomScanState:
    1334           0 :             if (planstate->plan->parallel_aware)
    1335           0 :                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
    1336             :                                                pwcxt);
    1337           0 :             break;
    1338         186 :         case T_BitmapHeapScanState:
    1339         186 :             if (planstate->plan->parallel_aware)
    1340         184 :                 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
    1341             :                                                pwcxt);
    1342         186 :             break;
    1343         332 :         case T_HashJoinState:
    1344         332 :             if (planstate->plan->parallel_aware)
    1345         172 :                 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
    1346             :                                              pwcxt);
    1347         332 :             break;
    1348         332 :         case T_HashState:
    1349             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1350         332 :             ExecHashInitializeWorker((HashState *) planstate, pwcxt);
    1351         332 :             break;
    1352         286 :         case T_SortState:
    1353             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1354         286 :             ExecSortInitializeWorker((SortState *) planstate, pwcxt);
    1355         286 :             break;
    1356           0 :         case T_IncrementalSortState:
    1357             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1358           0 :             ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
    1359             :                                                 pwcxt);
    1360           0 :             break;
    1361         982 :         case T_AggState:
    1362             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1363         982 :             ExecAggInitializeWorker((AggState *) planstate, pwcxt);
    1364         982 :             break;
    1365           8 :         case T_MemoizeState:
    1366             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1367           8 :             ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
    1368           8 :             break;
    1369         254 :         default:
    1370         254 :             break;
    1371             :     }
    1372             : 
    1373        5134 :     return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
    1374             :                                  pwcxt);
    1375             : }
    1376             : 
    1377             : /*
    1378             :  * Main entrypoint for parallel query worker processes.
    1379             :  *
    1380             :  * We reach this function from ParallelWorkerMain, so the setup necessary to
    1381             :  * create a sensible parallel environment has already been done;
    1382             :  * ParallelWorkerMain worries about stuff like the transaction state, combo
    1383             :  * CID mappings, and GUC values, so we don't need to deal with any of that
    1384             :  * here.
    1385             :  *
    1386             :  * Our job is to deal with concerns specific to the executor.  The parallel
    1387             :  * group leader will have stored a serialized PlannedStmt, and it's our job
    1388             :  * to execute that plan and write the resulting tuples to the appropriate
    1389             :  * tuple queue.  Various bits of supporting information that we need in order
    1390             :  * to do this are also stored in the dsm_segment and can be accessed through
    1391             :  * the shm_toc.
    1392             :  */
    1393             : void
    1394        1572 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    1395             : {
    1396             :     FixedParallelExecutorState *fpes;
    1397             :     BufferUsage *buffer_usage;
    1398             :     WalUsage   *wal_usage;
    1399             :     DestReceiver *receiver;
    1400             :     QueryDesc  *queryDesc;
    1401             :     SharedExecutorInstrumentation *instrumentation;
    1402             :     SharedJitInstrumentation *jit_instrumentation;
    1403        1572 :     int         instrument_options = 0;
    1404             :     void       *area_space;
    1405             :     dsa_area   *area;
    1406             :     ParallelWorkerContext pwcxt;
    1407             : 
    1408             :     /* Get fixed-size state. */
    1409        1572 :     fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
    1410             : 
    1411             :     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    1412        1572 :     receiver = ExecParallelGetReceiver(seg, toc);
    1413        1572 :     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
    1414        1572 :     if (instrumentation != NULL)
    1415         492 :         instrument_options = instrumentation->instrument_options;
    1416        1572 :     jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
    1417             :                                          true);
    1418        1572 :     queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
    1419             : 
    1420             :     /* Setting debug_query_string for individual workers */
    1421        1572 :     debug_query_string = queryDesc->sourceText;
    1422             : 
    1423             :     /* Report workers' query and queryId for monitoring purposes */
    1424        1572 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1425             : 
    1426             :     /* Attach to the dynamic shared memory area. */
    1427        1572 :     area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
    1428        1572 :     area = dsa_attach_in_place(area_space, seg);
    1429             : 
    1430             :     /* Start up the executor */
    1431        1572 :     queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
    1432        1572 :     ExecutorStart(queryDesc, fpes->eflags);
    1433             : 
    1434             :     /* Special executor initialization steps for parallel workers */
    1435        1572 :     queryDesc->planstate->state->es_query_dsa = area;
    1436        1572 :     if (DsaPointerIsValid(fpes->param_exec))
    1437             :     {
    1438             :         char       *paramexec_space;
    1439             : 
    1440          48 :         paramexec_space = dsa_get_address(area, fpes->param_exec);
    1441          48 :         RestoreParamExecParams(paramexec_space, queryDesc->estate);
    1442             : 
    1443             :     }
    1444        1572 :     pwcxt.toc = toc;
    1445        1572 :     pwcxt.seg = seg;
    1446        1572 :     ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
    1447             : 
    1448             :     /* Pass down any tuple bound */
    1449        1572 :     ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
    1450             : 
    1451             :     /*
    1452             :      * Prepare to track buffer/WAL usage during query execution.
    1453             :      *
    1454             :      * We do this after starting up the executor to match what happens in the
    1455             :      * leader, which also doesn't count buffer accesses and WAL activity that
    1456             :      * occur during executor startup.
    1457             :      */
    1458        1572 :     InstrStartParallelQuery();
    1459             : 
    1460             :     /*
    1461             :      * Run the plan.  If we specified a tuple bound, be careful not to demand
    1462             :      * more tuples than that.
    1463             :      */
    1464        1572 :     ExecutorRun(queryDesc,
    1465             :                 ForwardScanDirection,
    1466        1572 :                 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
    1467             :                 true);
    1468             : 
    1469             :     /* Shut down the executor */
    1470        1568 :     ExecutorFinish(queryDesc);
    1471             : 
    1472             :     /* Report buffer/WAL usage during parallel execution. */
    1473        1568 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
    1474        1568 :     wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
    1475        1568 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1476        1568 :                           &wal_usage[ParallelWorkerNumber]);
    1477             : 
    1478             :     /* Report instrumentation data if any instrumentation options are set. */
    1479        1568 :     if (instrumentation != NULL)
    1480         492 :         ExecParallelReportInstrumentation(queryDesc->planstate,
    1481             :                                           instrumentation);
    1482             : 
    1483             :     /* Report JIT instrumentation data if any */
    1484        1568 :     if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
    1485             :     {
    1486             :         Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
    1487          96 :         jit_instrumentation->jit_instr[ParallelWorkerNumber] =
    1488          96 :             queryDesc->estate->es_jit->instr;
    1489             :     }
    1490             : 
    1491             :     /* Must do this after capturing instrumentation. */
    1492        1568 :     ExecutorEnd(queryDesc);
    1493             : 
    1494             :     /* Cleanup. */
    1495        1568 :     dsa_detach(area);
    1496        1568 :     FreeQueryDesc(queryDesc);
    1497        1568 :     receiver->rDestroy(receiver);
    1498        1568 : }

Generated by: LCOV version 1.14