LCOV - code coverage report
Current view: top level - src/backend/executor - execParallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 502 566 88.7 %
Date: 2020-06-05 19:06:29 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execParallel.c
       4             :  *    Support routines for parallel execution.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * This file contains routines that are intended to support setting up,
      10             :  * using, and tearing down a ParallelContext from within the PostgreSQL
      11             :  * executor.  The ParallelContext machinery will handle starting the
      12             :  * workers and ensuring that their state generally matches that of the
      13             :  * leader; see src/backend/access/transam/README.parallel for details.
      14             :  * However, we must save and restore relevant executor state, such as
      15             :  * any ParamListInfo associated with the query, buffer/WAL usage info, and
      16             :  * the actual plan to be passed down to the worker.
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *    src/backend/executor/execParallel.c
      20             :  *
      21             :  *-------------------------------------------------------------------------
      22             :  */
      23             : 
      24             : #include "postgres.h"
      25             : 
      26             : #include "executor/execParallel.h"
      27             : #include "executor/executor.h"
      28             : #include "executor/nodeAppend.h"
      29             : #include "executor/nodeBitmapHeapscan.h"
      30             : #include "executor/nodeCustom.h"
      31             : #include "executor/nodeForeignscan.h"
      32             : #include "executor/nodeHash.h"
      33             : #include "executor/nodeHashjoin.h"
      34             : #include "executor/nodeIncrementalSort.h"
      35             : #include "executor/nodeIndexonlyscan.h"
      36             : #include "executor/nodeIndexscan.h"
      37             : #include "executor/nodeSeqscan.h"
      38             : #include "executor/nodeSort.h"
      39             : #include "executor/nodeSubplan.h"
      40             : #include "executor/tqueue.h"
      41             : #include "jit/jit.h"
      42             : #include "nodes/nodeFuncs.h"
      43             : #include "pgstat.h"
      44             : #include "storage/spin.h"
      45             : #include "tcop/tcopprot.h"
      46             : #include "utils/datum.h"
      47             : #include "utils/dsa.h"
      48             : #include "utils/lsyscache.h"
      49             : #include "utils/memutils.h"
      50             : #include "utils/snapmgr.h"
      51             : 
      52             : /*
      53             :  * Magic numbers for parallel executor communication.  We use constants
      54             :  * greater than any 32-bit integer here so that values < 2^32 can be used
      55             :  * by individual parallel nodes to store their own state.
      56             :  */
      57             : #define PARALLEL_KEY_EXECUTOR_FIXED     UINT64CONST(0xE000000000000001)
      58             : #define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
      59             : #define PARALLEL_KEY_PARAMLISTINFO      UINT64CONST(0xE000000000000003)
      60             : #define PARALLEL_KEY_BUFFER_USAGE       UINT64CONST(0xE000000000000004)
      61             : #define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
      62             : #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
      63             : #define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
      64             : #define PARALLEL_KEY_QUERY_TEXT     UINT64CONST(0xE000000000000008)
      65             : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
      66             : #define PARALLEL_KEY_WAL_USAGE          UINT64CONST(0xE00000000000000A)
      67             : 
      68             : #define PARALLEL_TUPLE_QUEUE_SIZE       65536
      69             : 
      70             : /*
      71             :  * Fixed-size random stuff that we need to pass to parallel workers.
      72             :  */
      73             : typedef struct FixedParallelExecutorState
      74             : {
      75             :     int64       tuples_needed;  /* tuple bound, see ExecSetTupleBound */
      76             :     dsa_pointer param_exec;
      77             :     int         eflags;
      78             :     int         jit_flags;
      79             : } FixedParallelExecutorState;
      80             : 
      81             : /*
      82             :  * DSM structure for accumulating per-PlanState instrumentation.
      83             :  *
      84             :  * instrument_options: Same meaning here as in instrument.c.
      85             :  *
      86             :  * instrument_offset: Offset, relative to the start of this structure,
      87             :  * of the first Instrumentation object.  This will depend on the length of
      88             :  * the plan_node_id array.
      89             :  *
      90             :  * num_workers: Number of workers.
      91             :  *
      92             :  * num_plan_nodes: Number of plan nodes.
      93             :  *
      94             :  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
      95             :  * from parallel workers.  The length of this array is given by num_plan_nodes.
      96             :  */
      97             : struct SharedExecutorInstrumentation
      98             : {
      99             :     int         instrument_options;
     100             :     int         instrument_offset;
     101             :     int         num_workers;
     102             :     int         num_plan_nodes;
     103             :     int         plan_node_id[FLEXIBLE_ARRAY_MEMBER];
     104             :     /* array of num_plan_nodes * num_workers Instrumentation objects follows */
     105             : };
     106             : #define GetInstrumentationArray(sei) \
     107             :     (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
     108             :      (Instrumentation *) (((char *) sei) + sei->instrument_offset))
     109             : 
     110             : /* Context object for ExecParallelEstimate. */
     111             : typedef struct ExecParallelEstimateContext
     112             : {
     113             :     ParallelContext *pcxt;
     114             :     int         nnodes;
     115             : } ExecParallelEstimateContext;
     116             : 
     117             : /* Context object for ExecParallelInitializeDSM. */
     118             : typedef struct ExecParallelInitializeDSMContext
     119             : {
     120             :     ParallelContext *pcxt;
     121             :     SharedExecutorInstrumentation *instrumentation;
     122             :     int         nnodes;
     123             : } ExecParallelInitializeDSMContext;
     124             : 
     125             : /* Helper functions that run in the parallel leader. */
     126             : static char *ExecSerializePlan(Plan *plan, EState *estate);
     127             : static bool ExecParallelEstimate(PlanState *node,
     128             :                                  ExecParallelEstimateContext *e);
     129             : static bool ExecParallelInitializeDSM(PlanState *node,
     130             :                                       ExecParallelInitializeDSMContext *d);
     131             : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
     132             :                                                     bool reinitialize);
     133             : static bool ExecParallelReInitializeDSM(PlanState *planstate,
     134             :                                         ParallelContext *pcxt);
     135             : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     136             :                                                 SharedExecutorInstrumentation *instrumentation);
     137             : 
     138             : /* Helper function that runs in the parallel worker. */
     139             : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
     140             : 
     141             : /*
     142             :  * Create a serialized representation of the plan to be sent to each worker.
     143             :  */
     144             : static char *
     145         370 : ExecSerializePlan(Plan *plan, EState *estate)
     146             : {
     147             :     PlannedStmt *pstmt;
     148             :     ListCell   *lc;
     149             : 
     150             :     /* We can't scribble on the original plan, so make a copy. */
     151         370 :     plan = copyObject(plan);
     152             : 
     153             :     /*
     154             :      * The worker will start its own copy of the executor, and that copy will
     155             :      * insert a junk filter if the toplevel node has any resjunk entries. We
     156             :      * don't want that to happen, because while resjunk columns shouldn't be
     157             :      * sent back to the user, here the tuples are coming back to another
     158             :      * backend which may very well need them.  So mutate the target list
     159             :      * accordingly.  This is sort of a hack; there might be better ways to do
     160             :      * this...
     161             :      */
     162        1026 :     foreach(lc, plan->targetlist)
     163             :     {
     164         656 :         TargetEntry *tle = lfirst_node(TargetEntry, lc);
     165             : 
     166         656 :         tle->resjunk = false;
     167             :     }
     168             : 
     169             :     /*
     170             :      * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
     171             :      * for our purposes, but the worker will need at least a minimal
     172             :      * PlannedStmt to start the executor.
     173             :      */
     174         370 :     pstmt = makeNode(PlannedStmt);
     175         370 :     pstmt->commandType = CMD_SELECT;
     176         370 :     pstmt->queryId = UINT64CONST(0);
     177         370 :     pstmt->hasReturning = false;
     178         370 :     pstmt->hasModifyingCTE = false;
     179         370 :     pstmt->canSetTag = true;
     180         370 :     pstmt->transientPlan = false;
     181         370 :     pstmt->dependsOnRole = false;
     182         370 :     pstmt->parallelModeNeeded = false;
     183         370 :     pstmt->planTree = plan;
     184         370 :     pstmt->rtable = estate->es_range_table;
     185         370 :     pstmt->resultRelations = NIL;
     186         370 :     pstmt->rootResultRelations = NIL;
     187         370 :     pstmt->appendRelations = NIL;
     188             : 
     189             :     /*
     190             :      * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
     191             :      * for unsafe ones (so that the list indexes of the safe ones are
     192             :      * preserved).  This positively ensures that the worker won't try to run,
     193             :      * or even do ExecInitNode on, an unsafe subplan.  That's important to
     194             :      * protect, eg, non-parallel-aware FDWs from getting into trouble.
     195             :      */
     196         370 :     pstmt->subplans = NIL;
     197         390 :     foreach(lc, estate->es_plannedstmt->subplans)
     198             :     {
     199          20 :         Plan       *subplan = (Plan *) lfirst(lc);
     200             : 
     201          20 :         if (subplan && !subplan->parallel_safe)
     202          16 :             subplan = NULL;
     203          20 :         pstmt->subplans = lappend(pstmt->subplans, subplan);
     204             :     }
     205             : 
     206         370 :     pstmt->rewindPlanIDs = NULL;
     207         370 :     pstmt->rowMarks = NIL;
     208         370 :     pstmt->relationOids = NIL;
     209         370 :     pstmt->invalItems = NIL; /* workers can't replan anyway... */
     210         370 :     pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
     211         370 :     pstmt->utilityStmt = NULL;
     212         370 :     pstmt->stmt_location = -1;
     213         370 :     pstmt->stmt_len = -1;
     214             : 
     215             :     /* Return serialized copy of our dummy PlannedStmt. */
     216         370 :     return nodeToString(pstmt);
     217             : }
     218             : 
     219             : /*
     220             :  * Parallel-aware plan nodes (and occasionally others) may need some state
     221             :  * which is shared across all parallel workers.  Before we size the DSM, give
     222             :  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
     223             :  * &pcxt->estimator.
     224             :  *
     225             :  * While we're at it, count the number of PlanState nodes in the tree, so
     226             :  * we know how many Instrumentation structures we need.
     227             :  */
     228             : static bool
     229        1674 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
     230             : {
     231        1674 :     if (planstate == NULL)
     232           0 :         return false;
     233             : 
     234             :     /* Count this node. */
     235        1674 :     e->nnodes++;
     236             : 
     237        1674 :     switch (nodeTag(planstate))
     238             :     {
     239         644 :         case T_SeqScanState:
     240         644 :             if (planstate->plan->parallel_aware)
     241         496 :                 ExecSeqScanEstimate((SeqScanState *) planstate,
     242             :                                     e->pcxt);
     243         644 :             break;
     244         192 :         case T_IndexScanState:
     245         192 :             if (planstate->plan->parallel_aware)
     246           8 :                 ExecIndexScanEstimate((IndexScanState *) planstate,
     247             :                                       e->pcxt);
     248         192 :             break;
     249          28 :         case T_IndexOnlyScanState:
     250          28 :             if (planstate->plan->parallel_aware)
     251          24 :                 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
     252             :                                           e->pcxt);
     253          28 :             break;
     254           0 :         case T_ForeignScanState:
     255           0 :             if (planstate->plan->parallel_aware)
     256           0 :                 ExecForeignScanEstimate((ForeignScanState *) planstate,
     257             :                                         e->pcxt);
     258           0 :             break;
     259         100 :         case T_AppendState:
     260         100 :             if (planstate->plan->parallel_aware)
     261          68 :                 ExecAppendEstimate((AppendState *) planstate,
     262             :                                    e->pcxt);
     263         100 :             break;
     264           0 :         case T_CustomScanState:
     265           0 :             if (planstate->plan->parallel_aware)
     266           0 :                 ExecCustomScanEstimate((CustomScanState *) planstate,
     267             :                                        e->pcxt);
     268           0 :             break;
     269          10 :         case T_BitmapHeapScanState:
     270          10 :             if (planstate->plan->parallel_aware)
     271           8 :                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
     272             :                                        e->pcxt);
     273          10 :             break;
     274         112 :         case T_HashJoinState:
     275         112 :             if (planstate->plan->parallel_aware)
     276          64 :                 ExecHashJoinEstimate((HashJoinState *) planstate,
     277             :                                      e->pcxt);
     278         112 :             break;
     279         112 :         case T_HashState:
     280             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     281         112 :             ExecHashEstimate((HashState *) planstate, e->pcxt);
     282         112 :             break;
     283          94 :         case T_SortState:
     284             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     285          94 :             ExecSortEstimate((SortState *) planstate, e->pcxt);
     286          94 :             break;
     287           0 :         case T_IncrementalSortState:
     288             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     289           0 :             ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
     290           0 :             break;
     291             : 
     292         382 :         default:
     293         382 :             break;
     294             :     }
     295             : 
     296        1674 :     return planstate_tree_walker(planstate, ExecParallelEstimate, e);
     297             : }
     298             : 
     299             : /*
     300             :  * Estimate the amount of space required to serialize the indicated parameters.
     301             :  */
     302             : static Size
     303           8 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
     304             : {
     305             :     int         paramid;
     306           8 :     Size        sz = sizeof(int);
     307             : 
     308           8 :     paramid = -1;
     309          20 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     310             :     {
     311             :         Oid         typeOid;
     312             :         int16       typLen;
     313             :         bool        typByVal;
     314             :         ParamExecData *prm;
     315             : 
     316          12 :         prm = &(estate->es_param_exec_vals[paramid]);
     317          12 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     318             :                                paramid);
     319             : 
     320          12 :         sz = add_size(sz, sizeof(int)); /* space for paramid */
     321             : 
     322             :         /* space for datum/isnull */
     323          12 :         if (OidIsValid(typeOid))
     324          12 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     325             :         else
     326             :         {
     327             :             /* If no type OID, assume by-value, like copyParamList does. */
     328           0 :             typLen = sizeof(Datum);
     329           0 :             typByVal = true;
     330             :         }
     331          12 :         sz = add_size(sz,
     332          12 :                       datumEstimateSpace(prm->value, prm->isnull,
     333             :                                          typByVal, typLen));
     334             :     }
     335           8 :     return sz;
     336             : }
     337             : 
     338             : /*
     339             :  * Serialize specified PARAM_EXEC parameters.
     340             :  *
     341             :  * We write the number of parameters first, as a 4-byte integer, and then
     342             :  * write details for each parameter in turn.  The details for each parameter
     343             :  * consist of a 4-byte paramid (location of param in execution time internal
     344             :  * parameter array) and then the datum as serialized by datumSerialize().
     345             :  */
     346             : static dsa_pointer
     347           8 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
     348             : {
     349             :     Size        size;
     350             :     int         nparams;
     351             :     int         paramid;
     352             :     ParamExecData *prm;
     353             :     dsa_pointer handle;
     354             :     char       *start_address;
     355             : 
     356             :     /* Allocate enough space for the current parameter values. */
     357           8 :     size = EstimateParamExecSpace(estate, params);
     358           8 :     handle = dsa_allocate(area, size);
     359           8 :     start_address = dsa_get_address(area, handle);
     360             : 
     361             :     /* First write the number of parameters as a 4-byte integer. */
     362           8 :     nparams = bms_num_members(params);
     363           8 :     memcpy(start_address, &nparams, sizeof(int));
     364           8 :     start_address += sizeof(int);
     365             : 
     366             :     /* Write details for each parameter in turn. */
     367           8 :     paramid = -1;
     368          20 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     369             :     {
     370             :         Oid         typeOid;
     371             :         int16       typLen;
     372             :         bool        typByVal;
     373             : 
     374          12 :         prm = &(estate->es_param_exec_vals[paramid]);
     375          12 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     376             :                                paramid);
     377             : 
     378             :         /* Write paramid. */
     379          12 :         memcpy(start_address, &paramid, sizeof(int));
     380          12 :         start_address += sizeof(int);
     381             : 
     382             :         /* Write datum/isnull */
     383          12 :         if (OidIsValid(typeOid))
     384          12 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     385             :         else
     386             :         {
     387             :             /* If no type OID, assume by-value, like copyParamList does. */
     388           0 :             typLen = sizeof(Datum);
     389           0 :             typByVal = true;
     390             :         }
     391          12 :         datumSerialize(prm->value, prm->isnull, typByVal, typLen,
     392             :                        &start_address);
     393             :     }
     394             : 
     395           8 :     return handle;
     396             : }
     397             : 
     398             : /*
     399             :  * Restore specified PARAM_EXEC parameters.
     400             :  */
     401             : static void
     402          24 : RestoreParamExecParams(char *start_address, EState *estate)
     403             : {
     404             :     int         nparams;
     405             :     int         i;
     406             :     int         paramid;
     407             : 
     408          24 :     memcpy(&nparams, start_address, sizeof(int));
     409          24 :     start_address += sizeof(int);
     410             : 
     411          56 :     for (i = 0; i < nparams; i++)
     412             :     {
     413             :         ParamExecData *prm;
     414             : 
     415             :         /* Read paramid */
     416          32 :         memcpy(&paramid, start_address, sizeof(int));
     417          32 :         start_address += sizeof(int);
     418          32 :         prm = &(estate->es_param_exec_vals[paramid]);
     419             : 
     420             :         /* Read datum/isnull. */
     421          32 :         prm->value = datumRestore(&start_address, &prm->isnull);
     422          32 :         prm->execPlan = NULL;
     423             :     }
     424          24 : }
     425             : 
     426             : /*
     427             :  * Initialize the dynamic shared memory segment that will be used to control
     428             :  * parallel execution.
     429             :  */
     430             : static bool
     431        1674 : ExecParallelInitializeDSM(PlanState *planstate,
     432             :                           ExecParallelInitializeDSMContext *d)
     433             : {
     434        1674 :     if (planstate == NULL)
     435           0 :         return false;
     436             : 
     437             :     /* If instrumentation is enabled, initialize slot for this node. */
     438        1674 :     if (d->instrumentation != NULL)
     439        1280 :         d->instrumentation->plan_node_id[d->nnodes] =
     440         640 :             planstate->plan->plan_node_id;
     441             : 
     442             :     /* Count this node. */
     443        1674 :     d->nnodes++;
     444             : 
     445             :     /*
     446             :      * Call initializers for DSM-using plan nodes.
     447             :      *
     448             :      * Most plan nodes won't do anything here, but plan nodes that allocated
     449             :      * DSM may need to initialize shared state in the DSM before parallel
     450             :      * workers are launched.  They can allocate the space they previously
     451             :      * estimated using shm_toc_allocate, and add the keys they previously
     452             :      * estimated using shm_toc_insert, in each case targeting pcxt->toc.
     453             :      */
     454        1674 :     switch (nodeTag(planstate))
     455             :     {
     456         644 :         case T_SeqScanState:
     457         644 :             if (planstate->plan->parallel_aware)
     458         496 :                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
     459             :                                          d->pcxt);
     460         644 :             break;
     461         192 :         case T_IndexScanState:
     462         192 :             if (planstate->plan->parallel_aware)
     463           8 :                 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
     464             :                                            d->pcxt);
     465         192 :             break;
     466          28 :         case T_IndexOnlyScanState:
     467          28 :             if (planstate->plan->parallel_aware)
     468          24 :                 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
     469             :                                                d->pcxt);
     470          28 :             break;
     471           0 :         case T_ForeignScanState:
     472           0 :             if (planstate->plan->parallel_aware)
     473           0 :                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
     474             :                                              d->pcxt);
     475           0 :             break;
     476         100 :         case T_AppendState:
     477         100 :             if (planstate->plan->parallel_aware)
     478          68 :                 ExecAppendInitializeDSM((AppendState *) planstate,
     479             :                                         d->pcxt);
     480         100 :             break;
     481           0 :         case T_CustomScanState:
     482           0 :             if (planstate->plan->parallel_aware)
     483           0 :                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
     484             :                                             d->pcxt);
     485           0 :             break;
     486          10 :         case T_BitmapHeapScanState:
     487          10 :             if (planstate->plan->parallel_aware)
     488           8 :                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
     489             :                                             d->pcxt);
     490          10 :             break;
     491         112 :         case T_HashJoinState:
     492         112 :             if (planstate->plan->parallel_aware)
     493          64 :                 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
     494             :                                           d->pcxt);
     495         112 :             break;
     496         112 :         case T_HashState:
     497             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     498         112 :             ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
     499         112 :             break;
     500          94 :         case T_SortState:
     501             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     502          94 :             ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
     503          94 :             break;
     504           0 :         case T_IncrementalSortState:
     505             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     506           0 :             ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
     507           0 :             break;
     508             : 
     509         382 :         default:
     510         382 :             break;
     511             :     }
     512             : 
     513        1674 :     return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
     514             : }
     515             : 
     516             : /*
     517             :  * It sets up the response queues for backend workers to return tuples
     518             :  * to the main backend and start the workers.
     519             :  */
     520             : static shm_mq_handle **
     521         542 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
     522             : {
     523             :     shm_mq_handle **responseq;
     524             :     char       *tqueuespace;
     525             :     int         i;
     526             : 
     527             :     /* Skip this if no workers. */
     528         542 :     if (pcxt->nworkers == 0)
     529           0 :         return NULL;
     530             : 
     531             :     /* Allocate memory for shared memory queue handles. */
     532             :     responseq = (shm_mq_handle **)
     533         542 :         palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
     534             : 
     535             :     /*
     536             :      * If not reinitializing, allocate space from the DSM for the queues;
     537             :      * otherwise, find the already allocated space.
     538             :      */
     539         542 :     if (!reinitialize)
     540             :         tqueuespace =
     541         370 :             shm_toc_allocate(pcxt->toc,
     542             :                              mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
     543         370 :                                       pcxt->nworkers));
     544             :     else
     545         172 :         tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     546             : 
     547             :     /* Create the queues, and become the receiver for each. */
     548        2052 :     for (i = 0; i < pcxt->nworkers; ++i)
     549             :     {
     550             :         shm_mq     *mq;
     551             : 
     552        1510 :         mq = shm_mq_create(tqueuespace +
     553        1510 :                            ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
     554             :                            (Size) PARALLEL_TUPLE_QUEUE_SIZE);
     555             : 
     556        1510 :         shm_mq_set_receiver(mq, MyProc);
     557        1510 :         responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
     558             :     }
     559             : 
     560             :     /* Add array of queues to shm_toc, so others can find it. */
     561         542 :     if (!reinitialize)
     562         370 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
     563             : 
     564             :     /* Return array of handles. */
     565         542 :     return responseq;
     566             : }
     567             : 
     568             : /*
     569             :  * Sets up the required infrastructure for backend workers to perform
     570             :  * execution and return results to the main backend.
     571             :  */
     572             : ParallelExecutorInfo *
     573         370 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
     574             :                      Bitmapset *sendParams, int nworkers,
     575             :                      int64 tuples_needed)
     576             : {
     577             :     ParallelExecutorInfo *pei;
     578             :     ParallelContext *pcxt;
     579             :     ExecParallelEstimateContext e;
     580             :     ExecParallelInitializeDSMContext d;
     581             :     FixedParallelExecutorState *fpes;
     582             :     char       *pstmt_data;
     583             :     char       *pstmt_space;
     584             :     char       *paramlistinfo_space;
     585             :     BufferUsage *bufusage_space;
     586             :     WalUsage   *walusage_space;
     587         370 :     SharedExecutorInstrumentation *instrumentation = NULL;
     588         370 :     SharedJitInstrumentation *jit_instrumentation = NULL;
     589             :     int         pstmt_len;
     590             :     int         paramlistinfo_len;
     591         370 :     int         instrumentation_len = 0;
     592         370 :     int         jit_instrumentation_len = 0;
     593         370 :     int         instrument_offset = 0;
     594         370 :     Size        dsa_minsize = dsa_minimum_size();
     595             :     char       *query_string;
     596             :     int         query_len;
     597             : 
     598             :     /*
     599             :      * Force any initplan outputs that we're going to pass to workers to be
     600             :      * evaluated, if they weren't already.
     601             :      *
     602             :      * For simplicity, we use the EState's per-output-tuple ExprContext here.
     603             :      * That risks intra-query memory leakage, since we might pass through here
     604             :      * many times before that ExprContext gets reset; but ExecSetParamPlan
     605             :      * doesn't normally leak any memory in the context (see its comments), so
     606             :      * it doesn't seem worth complicating this function's API to pass it a
     607             :      * shorter-lived ExprContext.  This might need to change someday.
     608             :      */
     609         370 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     610             : 
     611             :     /* Allocate object for return value. */
     612         370 :     pei = palloc0(sizeof(ParallelExecutorInfo));
     613         370 :     pei->finished = false;
     614         370 :     pei->planstate = planstate;
     615             : 
     616             :     /* Fix up and serialize plan to be sent to workers. */
     617         370 :     pstmt_data = ExecSerializePlan(planstate->plan, estate);
     618             : 
     619             :     /* Create a parallel context. */
     620         370 :     pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
     621         370 :     pei->pcxt = pcxt;
     622             : 
     623             :     /*
     624             :      * Before telling the parallel context to create a dynamic shared memory
     625             :      * segment, we need to figure out how big it should be.  Estimate space
     626             :      * for the various things we need to store.
     627             :      */
     628             : 
     629             :     /* Estimate space for fixed-size state. */
     630         370 :     shm_toc_estimate_chunk(&pcxt->estimator,
     631             :                            sizeof(FixedParallelExecutorState));
     632         370 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     633             : 
     634             :     /* Estimate space for query text. */
     635         370 :     query_len = strlen(estate->es_sourceText);
     636         370 :     shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
     637         370 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     638             : 
     639             :     /* Estimate space for serialized PlannedStmt. */
     640         370 :     pstmt_len = strlen(pstmt_data) + 1;
     641         370 :     shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
     642         370 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     643             : 
     644             :     /* Estimate space for serialized ParamListInfo. */
     645         370 :     paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
     646         370 :     shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
     647         370 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     648             : 
     649             :     /*
     650             :      * Estimate space for BufferUsage.
     651             :      *
     652             :      * If EXPLAIN is not in use and there are no extensions loaded that care,
     653             :      * we could skip this.  But we have no way of knowing whether anyone's
     654             :      * looking at pgBufferUsage, so do it unconditionally.
     655             :      */
     656         370 :     shm_toc_estimate_chunk(&pcxt->estimator,
     657             :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     658         370 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     659             : 
     660             :     /*
     661             :      * Same thing for WalUsage.
     662             :      */
     663         370 :     shm_toc_estimate_chunk(&pcxt->estimator,
     664             :                            mul_size(sizeof(WalUsage), pcxt->nworkers));
     665         370 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     666             : 
     667             :     /* Estimate space for tuple queues. */
     668         370 :     shm_toc_estimate_chunk(&pcxt->estimator,
     669             :                            mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
     670         370 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     671             : 
     672             :     /*
     673             :      * Give parallel-aware nodes a chance to add to the estimates, and get a
     674             :      * count of how many PlanState nodes there are.
     675             :      */
     676         370 :     e.pcxt = pcxt;
     677         370 :     e.nnodes = 0;
     678         370 :     ExecParallelEstimate(planstate, &e);
     679             : 
     680             :     /* Estimate space for instrumentation, if required. */
     681         370 :     if (estate->es_instrument)
     682             :     {
     683         112 :         instrumentation_len =
     684             :             offsetof(SharedExecutorInstrumentation, plan_node_id) +
     685         112 :             sizeof(int) * e.nnodes;
     686         112 :         instrumentation_len = MAXALIGN(instrumentation_len);
     687         112 :         instrument_offset = instrumentation_len;
     688         112 :         instrumentation_len +=
     689         112 :             mul_size(sizeof(Instrumentation),
     690         112 :                      mul_size(e.nnodes, nworkers));
     691         112 :         shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
     692         112 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     693             : 
     694             :         /* Estimate space for JIT instrumentation, if required. */
     695         112 :         if (estate->es_jit_flags != PGJIT_NONE)
     696             :         {
     697          16 :             jit_instrumentation_len =
     698          16 :                 offsetof(SharedJitInstrumentation, jit_instr) +
     699             :                 sizeof(JitInstrumentation) * nworkers;
     700          16 :             shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
     701          16 :             shm_toc_estimate_keys(&pcxt->estimator, 1);
     702             :         }
     703             :     }
     704             : 
     705             :     /* Estimate space for DSA area. */
     706         370 :     shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
     707         370 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     708             : 
     709             :     /* Everyone's had a chance to ask for space, so now create the DSM. */
     710         370 :     InitializeParallelDSM(pcxt);
     711             : 
     712             :     /*
     713             :      * OK, now we have a dynamic shared memory segment, and it should be big
     714             :      * enough to store all of the data we estimated we would want to put into
     715             :      * it, plus whatever general stuff (not specifically executor-related) the
     716             :      * ParallelContext itself needs to store there.  None of the space we
     717             :      * asked for has been allocated or initialized yet, though, so do that.
     718             :      */
     719             : 
     720             :     /* Store fixed-size state. */
     721         370 :     fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
     722         370 :     fpes->tuples_needed = tuples_needed;
     723         370 :     fpes->param_exec = InvalidDsaPointer;
     724         370 :     fpes->eflags = estate->es_top_eflags;
     725         370 :     fpes->jit_flags = estate->es_jit_flags;
     726         370 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
     727             : 
     728             :     /* Store query string */
     729         370 :     query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
     730         370 :     memcpy(query_string, estate->es_sourceText, query_len + 1);
     731         370 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
     732             : 
     733             :     /* Store serialized PlannedStmt. */
     734         370 :     pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
     735         370 :     memcpy(pstmt_space, pstmt_data, pstmt_len);
     736         370 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
     737             : 
     738             :     /* Store serialized ParamListInfo. */
     739         370 :     paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
     740         370 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
     741         370 :     SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
     742             : 
     743             :     /* Allocate space for each worker's BufferUsage; no need to initialize. */
     744         370 :     bufusage_space = shm_toc_allocate(pcxt->toc,
     745         370 :                                       mul_size(sizeof(BufferUsage), pcxt->nworkers));
     746         370 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
     747         370 :     pei->buffer_usage = bufusage_space;
     748             : 
     749             :     /* Same for WalUsage. */
     750         370 :     walusage_space = shm_toc_allocate(pcxt->toc,
     751         370 :                                       mul_size(sizeof(WalUsage), pcxt->nworkers));
     752         370 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
     753         370 :     pei->wal_usage = walusage_space;
     754             : 
     755             :     /* Set up the tuple queues that the workers will write into. */
     756         370 :     pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     757             : 
     758             :     /* We don't need the TupleQueueReaders yet, though. */
     759         370 :     pei->reader = NULL;
     760             : 
     761             :     /*
     762             :      * If instrumentation options were supplied, allocate space for the data.
     763             :      * It only gets partially initialized here; the rest happens during
     764             :      * ExecParallelInitializeDSM.
     765             :      */
     766         370 :     if (estate->es_instrument)
     767             :     {
     768             :         Instrumentation *instrument;
     769             :         int         i;
     770             : 
     771         112 :         instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
     772         112 :         instrumentation->instrument_options = estate->es_instrument;
     773         112 :         instrumentation->instrument_offset = instrument_offset;
     774         112 :         instrumentation->num_workers = nworkers;
     775         112 :         instrumentation->num_plan_nodes = e.nnodes;
     776         112 :         instrument = GetInstrumentationArray(instrumentation);
     777        1140 :         for (i = 0; i < nworkers * e.nnodes; ++i)
     778        1028 :             InstrInit(&instrument[i], estate->es_instrument);
     779         112 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
     780             :                        instrumentation);
     781         112 :         pei->instrumentation = instrumentation;
     782             : 
     783         112 :         if (estate->es_jit_flags != PGJIT_NONE)
     784             :         {
     785          16 :             jit_instrumentation = shm_toc_allocate(pcxt->toc,
     786             :                                                    jit_instrumentation_len);
     787          16 :             jit_instrumentation->num_workers = nworkers;
     788          16 :             memset(jit_instrumentation->jit_instr, 0,
     789             :                    sizeof(JitInstrumentation) * nworkers);
     790          16 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
     791             :                            jit_instrumentation);
     792          16 :             pei->jit_instrumentation = jit_instrumentation;
     793             :         }
     794             :     }
     795             : 
     796             :     /*
     797             :      * Create a DSA area that can be used by the leader and all workers.
     798             :      * (However, if we failed to create a DSM and are using private memory
     799             :      * instead, then skip this.)
     800             :      */
     801         370 :     if (pcxt->seg != NULL)
     802             :     {
     803             :         char       *area_space;
     804             : 
     805         370 :         area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
     806         370 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
     807         370 :         pei->area = dsa_create_in_place(area_space, dsa_minsize,
     808             :                                         LWTRANCHE_PARALLEL_QUERY_DSA,
     809             :                                         pcxt->seg);
     810             : 
     811             :         /*
     812             :          * Serialize parameters, if any, using DSA storage.  We don't dare use
     813             :          * the main parallel query DSM for this because we might relaunch
     814             :          * workers after the values have changed (and thus the amount of
     815             :          * storage required has changed).
     816             :          */
     817         370 :         if (!bms_is_empty(sendParams))
     818             :         {
     819           8 :             pei->param_exec = SerializeParamExecParams(estate, sendParams,
     820             :                                                        pei->area);
     821           8 :             fpes->param_exec = pei->param_exec;
     822             :         }
     823             :     }
     824             : 
     825             :     /*
     826             :      * Give parallel-aware nodes a chance to initialize their shared data.
     827             :      * This also initializes the elements of instrumentation->ps_instrument,
     828             :      * if it exists.
     829             :      */
     830         370 :     d.pcxt = pcxt;
     831         370 :     d.instrumentation = instrumentation;
     832         370 :     d.nnodes = 0;
     833             : 
     834             :     /* Install our DSA area while initializing the plan. */
     835         370 :     estate->es_query_dsa = pei->area;
     836         370 :     ExecParallelInitializeDSM(planstate, &d);
     837         370 :     estate->es_query_dsa = NULL;
     838             : 
     839             :     /*
     840             :      * Make sure that the world hasn't shifted under our feet.  This could
     841             :      * probably just be an Assert(), but let's be conservative for now.
     842             :      */
     843         370 :     if (e.nnodes != d.nnodes)
     844           0 :         elog(ERROR, "inconsistent count of PlanState nodes");
     845             : 
     846             :     /* OK, we're ready to rock and roll. */
     847         370 :     return pei;
     848             : }
     849             : 
     850             : /*
     851             :  * Set up tuple queue readers to read the results of a parallel subplan.
     852             :  *
     853             :  * This is separate from ExecInitParallelPlan() because we can launch the
     854             :  * worker processes and let them start doing something before we do this.
     855             :  */
     856             : void
     857         530 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
     858             : {
     859         530 :     int         nworkers = pei->pcxt->nworkers_launched;
     860             :     int         i;
     861             : 
     862             :     Assert(pei->reader == NULL);
     863             : 
     864         530 :     if (nworkers > 0)
     865             :     {
     866         530 :         pei->reader = (TupleQueueReader **)
     867         530 :             palloc(nworkers * sizeof(TupleQueueReader *));
     868             : 
     869        1990 :         for (i = 0; i < nworkers; i++)
     870             :         {
     871        1460 :             shm_mq_set_handle(pei->tqueue[i],
     872        1460 :                               pei->pcxt->worker[i].bgwhandle);
     873        1460 :             pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
     874             :         }
     875             :     }
     876         530 : }
     877             : 
     878             : /*
     879             :  * Re-initialize the parallel executor shared memory state before launching
     880             :  * a fresh batch of workers.
     881             :  */
     882             : void
     883         172 : ExecParallelReinitialize(PlanState *planstate,
     884             :                          ParallelExecutorInfo *pei,
     885             :                          Bitmapset *sendParams)
     886             : {
     887         172 :     EState     *estate = planstate->state;
     888             :     FixedParallelExecutorState *fpes;
     889             : 
     890             :     /* Old workers must already be shut down */
     891             :     Assert(pei->finished);
     892             : 
     893             :     /*
     894             :      * Force any initplan outputs that we're going to pass to workers to be
     895             :      * evaluated, if they weren't already (see comments in
     896             :      * ExecInitParallelPlan).
     897             :      */
     898         172 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     899             : 
     900         172 :     ReinitializeParallelDSM(pei->pcxt);
     901         172 :     pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
     902         172 :     pei->reader = NULL;
     903         172 :     pei->finished = false;
     904             : 
     905         172 :     fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
     906             : 
     907             :     /* Free any serialized parameters from the last round. */
     908         172 :     if (DsaPointerIsValid(fpes->param_exec))
     909             :     {
     910           0 :         dsa_free(pei->area, fpes->param_exec);
     911           0 :         fpes->param_exec = InvalidDsaPointer;
     912             :     }
     913             : 
     914             :     /* Serialize current parameter values if required. */
     915         172 :     if (!bms_is_empty(sendParams))
     916             :     {
     917           0 :         pei->param_exec = SerializeParamExecParams(estate, sendParams,
     918             :                                                    pei->area);
     919           0 :         fpes->param_exec = pei->param_exec;
     920             :     }
     921             : 
     922             :     /* Traverse plan tree and let each child node reset associated state. */
     923         172 :     estate->es_query_dsa = pei->area;
     924         172 :     ExecParallelReInitializeDSM(planstate, pei->pcxt);
     925         172 :     estate->es_query_dsa = NULL;
     926         172 : }
     927             : 
     928             : /*
     929             :  * Traverse plan tree to reinitialize per-node dynamic shared memory state
     930             :  */
     931             : static bool
     932         444 : ExecParallelReInitializeDSM(PlanState *planstate,
     933             :                             ParallelContext *pcxt)
     934             : {
     935         444 :     if (planstate == NULL)
     936           0 :         return false;
     937             : 
     938             :     /*
     939             :      * Call reinitializers for DSM-using plan nodes.
     940             :      */
     941         444 :     switch (nodeTag(planstate))
     942             :     {
     943         184 :         case T_SeqScanState:
     944         184 :             if (planstate->plan->parallel_aware)
     945         152 :                 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
     946             :                                            pcxt);
     947         184 :             break;
     948           8 :         case T_IndexScanState:
     949           8 :             if (planstate->plan->parallel_aware)
     950           8 :                 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
     951             :                                              pcxt);
     952           8 :             break;
     953           8 :         case T_IndexOnlyScanState:
     954           8 :             if (planstate->plan->parallel_aware)
     955           8 :                 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
     956             :                                                  pcxt);
     957           8 :             break;
     958           0 :         case T_ForeignScanState:
     959           0 :             if (planstate->plan->parallel_aware)
     960           0 :                 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
     961             :                                                pcxt);
     962           0 :             break;
     963           0 :         case T_AppendState:
     964           0 :             if (planstate->plan->parallel_aware)
     965           0 :                 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
     966           0 :             break;
     967           0 :         case T_CustomScanState:
     968           0 :             if (planstate->plan->parallel_aware)
     969           0 :                 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
     970             :                                               pcxt);
     971           0 :             break;
     972          36 :         case T_BitmapHeapScanState:
     973          36 :             if (planstate->plan->parallel_aware)
     974          36 :                 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
     975             :                                               pcxt);
     976          36 :             break;
     977          64 :         case T_HashJoinState:
     978          64 :             if (planstate->plan->parallel_aware)
     979          32 :                 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
     980             :                                             pcxt);
     981          64 :             break;
     982          84 :         case T_HashState:
     983             :         case T_SortState:
     984             :         case T_IncrementalSortState:
     985             :             /* these nodes have DSM state, but no reinitialization is required */
     986          84 :             break;
     987             : 
     988          60 :         default:
     989          60 :             break;
     990             :     }
     991             : 
     992         444 :     return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
     993             : }
     994             : 
     995             : /*
     996             :  * Copy instrumentation information about this node and its descendants from
     997             :  * dynamic shared memory.
     998             :  */
     999             : static bool
    1000         640 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
    1001             :                                     SharedExecutorInstrumentation *instrumentation)
    1002             : {
    1003             :     Instrumentation *instrument;
    1004             :     int         i;
    1005             :     int         n;
    1006             :     int         ibytes;
    1007         640 :     int         plan_node_id = planstate->plan->plan_node_id;
    1008             :     MemoryContext oldcontext;
    1009             : 
    1010             :     /* Find the instrumentation for this node. */
    1011        2908 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1012        2908 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1013         640 :             break;
    1014         640 :     if (i >= instrumentation->num_plan_nodes)
    1015           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1016             : 
    1017             :     /* Accumulate the statistics from all workers. */
    1018         640 :     instrument = GetInstrumentationArray(instrumentation);
    1019         640 :     instrument += i * instrumentation->num_workers;
    1020        1668 :     for (n = 0; n < instrumentation->num_workers; ++n)
    1021        1028 :         InstrAggNode(planstate->instrument, &instrument[n]);
    1022             : 
    1023             :     /*
    1024             :      * Also store the per-worker detail.
    1025             :      *
    1026             :      * Worker instrumentation should be allocated in the same context as the
    1027             :      * regular instrumentation information, which is the per-query context.
    1028             :      * Switch into per-query memory context.
    1029             :      */
    1030         640 :     oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
    1031         640 :     ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
    1032         640 :     planstate->worker_instrument =
    1033         640 :         palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
    1034         640 :     MemoryContextSwitchTo(oldcontext);
    1035             : 
    1036         640 :     planstate->worker_instrument->num_workers = instrumentation->num_workers;
    1037         640 :     memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
    1038             : 
    1039             :     /* Perform any node-type-specific work that needs to be done. */
    1040         640 :     switch (nodeTag(planstate))
    1041             :     {
    1042           8 :         case T_SortState:
    1043           8 :             ExecSortRetrieveInstrumentation((SortState *) planstate);
    1044           8 :             break;
    1045           0 :         case T_IncrementalSortState:
    1046           0 :             ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
    1047           0 :             break;
    1048          56 :         case T_HashState:
    1049          56 :             ExecHashRetrieveInstrumentation((HashState *) planstate);
    1050          56 :             break;
    1051         576 :         default:
    1052         576 :             break;
    1053             :     }
    1054             : 
    1055         640 :     return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
    1056             :                                  instrumentation);
    1057             : }
    1058             : 
    1059             : /*
    1060             :  * Add up the workers' JIT instrumentation from dynamic shared memory.
    1061             :  */
    1062             : static void
    1063          16 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
    1064             :                                        SharedJitInstrumentation *shared_jit)
    1065             : {
    1066             :     JitInstrumentation *combined;
    1067             :     int         ibytes;
    1068             : 
    1069             :     int         n;
    1070             : 
    1071             :     /*
    1072             :      * Accumulate worker JIT instrumentation into the combined JIT
    1073             :      * instrumentation, allocating it if required.
    1074             :      */
    1075          16 :     if (!planstate->state->es_jit_worker_instr)
    1076          16 :         planstate->state->es_jit_worker_instr =
    1077          16 :             MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
    1078          16 :     combined = planstate->state->es_jit_worker_instr;
    1079             : 
    1080             :     /* Accumulate all the workers' instrumentations. */
    1081          48 :     for (n = 0; n < shared_jit->num_workers; ++n)
    1082          32 :         InstrJitAgg(combined, &shared_jit->jit_instr[n]);
    1083             : 
    1084             :     /*
    1085             :      * Store the per-worker detail.
    1086             :      *
    1087             :      * Similar to ExecParallelRetrieveInstrumentation(), allocate the
    1088             :      * instrumentation in per-query context.
    1089             :      */
    1090          16 :     ibytes = offsetof(SharedJitInstrumentation, jit_instr)
    1091          16 :         + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
    1092          16 :     planstate->worker_jit_instrument =
    1093          16 :         MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
    1094             : 
    1095          16 :     memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
    1096          16 : }
    1097             : 
    1098             : /*
    1099             :  * Finish parallel execution.  We wait for parallel workers to finish, and
    1100             :  * accumulate their buffer/WAL usage.
    1101             :  */
    1102             : void
    1103         976 : ExecParallelFinish(ParallelExecutorInfo *pei)
    1104             : {
    1105         976 :     int         nworkers = pei->pcxt->nworkers_launched;
    1106             :     int         i;
    1107             : 
    1108             :     /* Make this be a no-op if called twice in a row. */
    1109         976 :     if (pei->finished)
    1110         438 :         return;
    1111             : 
    1112             :     /*
    1113             :      * Detach from tuple queues ASAP, so that any still-active workers will
    1114             :      * notice that no further results are wanted.
    1115             :      */
    1116         538 :     if (pei->tqueue != NULL)
    1117             :     {
    1118        1994 :         for (i = 0; i < nworkers; i++)
    1119        1456 :             shm_mq_detach(pei->tqueue[i]);
    1120         538 :         pfree(pei->tqueue);
    1121         538 :         pei->tqueue = NULL;
    1122             :     }
    1123             : 
    1124             :     /*
    1125             :      * While we're waiting for the workers to finish, let's get rid of the
    1126             :      * tuple queue readers.  (Any other local cleanup could be done here too.)
    1127             :      */
    1128         538 :     if (pei->reader != NULL)
    1129             :     {
    1130        1982 :         for (i = 0; i < nworkers; i++)
    1131        1456 :             DestroyTupleQueueReader(pei->reader[i]);
    1132         526 :         pfree(pei->reader);
    1133         526 :         pei->reader = NULL;
    1134             :     }
    1135             : 
    1136             :     /* Now wait for the workers to finish. */
    1137         538 :     WaitForParallelWorkersToFinish(pei->pcxt);
    1138             : 
    1139             :     /*
    1140             :      * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
    1141             :      * finish, or we might get incomplete data.)
    1142             :      */
    1143        1994 :     for (i = 0; i < nworkers; i++)
    1144        1456 :         InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
    1145             : 
    1146         538 :     pei->finished = true;
    1147             : }
    1148             : 
    1149             : /*
    1150             :  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
    1151             :  * resources still exist after ExecParallelFinish.  We separate these
    1152             :  * routines because someone might want to examine the contents of the DSM
    1153             :  * after ExecParallelFinish and before calling this routine.
    1154             :  */
    1155             : void
    1156         366 : ExecParallelCleanup(ParallelExecutorInfo *pei)
    1157             : {
    1158             :     /* Accumulate instrumentation, if any. */
    1159         366 :     if (pei->instrumentation)
    1160         112 :         ExecParallelRetrieveInstrumentation(pei->planstate,
    1161             :                                             pei->instrumentation);
    1162             : 
    1163             :     /* Accumulate JIT instrumentation, if any. */
    1164         366 :     if (pei->jit_instrumentation)
    1165          16 :         ExecParallelRetrieveJitInstrumentation(pei->planstate,
    1166          16 :                                                pei->jit_instrumentation);
    1167             : 
    1168             :     /* Free any serialized parameters. */
    1169         366 :     if (DsaPointerIsValid(pei->param_exec))
    1170             :     {
    1171           8 :         dsa_free(pei->area, pei->param_exec);
    1172           8 :         pei->param_exec = InvalidDsaPointer;
    1173             :     }
    1174         366 :     if (pei->area != NULL)
    1175             :     {
    1176         366 :         dsa_detach(pei->area);
    1177         366 :         pei->area = NULL;
    1178             :     }
    1179         366 :     if (pei->pcxt != NULL)
    1180             :     {
    1181         366 :         DestroyParallelContext(pei->pcxt);
    1182         366 :         pei->pcxt = NULL;
    1183             :     }
    1184         366 :     pfree(pei);
    1185         366 : }
    1186             : 
    1187             : /*
    1188             :  * Create a DestReceiver to write tuples we produce to the shm_mq designated
    1189             :  * for that purpose.
    1190             :  */
    1191             : static DestReceiver *
    1192        1460 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
    1193             : {
    1194             :     char       *mqspace;
    1195             :     shm_mq     *mq;
    1196             : 
    1197        1460 :     mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
    1198        1460 :     mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
    1199        1460 :     mq = (shm_mq *) mqspace;
    1200        1460 :     shm_mq_set_sender(mq, MyProc);
    1201        1460 :     return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
    1202             : }
    1203             : 
    1204             : /*
    1205             :  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
    1206             :  */
    1207             : static QueryDesc *
    1208        1460 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
    1209             :                          int instrument_options)
    1210             : {
    1211             :     char       *pstmtspace;
    1212             :     char       *paramspace;
    1213             :     PlannedStmt *pstmt;
    1214             :     ParamListInfo paramLI;
    1215             :     char       *queryString;
    1216             : 
    1217             :     /* Get the query string from shared memory */
    1218        1460 :     queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
    1219             : 
    1220             :     /* Reconstruct leader-supplied PlannedStmt. */
    1221        1460 :     pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
    1222        1460 :     pstmt = (PlannedStmt *) stringToNode(pstmtspace);
    1223             : 
    1224             :     /* Reconstruct ParamListInfo. */
    1225        1460 :     paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
    1226        1460 :     paramLI = RestoreParamList(&paramspace);
    1227             : 
    1228             :     /* Create a QueryDesc for the query. */
    1229        1460 :     return CreateQueryDesc(pstmt,
    1230             :                            queryString,
    1231             :                            GetActiveSnapshot(), InvalidSnapshot,
    1232             :                            receiver, paramLI, NULL, instrument_options);
    1233             : }
    1234             : 
    1235             : /*
    1236             :  * Copy instrumentation information from this node and its descendants into
    1237             :  * dynamic shared memory, so that the parallel leader can retrieve it.
    1238             :  */
    1239             : static bool
    1240        1492 : ExecParallelReportInstrumentation(PlanState *planstate,
    1241             :                                   SharedExecutorInstrumentation *instrumentation)
    1242             : {
    1243             :     int         i;
    1244        1492 :     int         plan_node_id = planstate->plan->plan_node_id;
    1245             :     Instrumentation *instrument;
    1246             : 
    1247        1492 :     InstrEndLoop(planstate->instrument);
    1248             : 
    1249             :     /*
    1250             :      * If we shuffled the plan_node_id values in ps_instrument into sorted
    1251             :      * order, we could use binary search here.  This might matter someday if
    1252             :      * we're pushing down sufficiently large plan trees.  For now, do it the
    1253             :      * slow, dumb way.
    1254             :      */
    1255        4820 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1256        4820 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1257        1492 :             break;
    1258        1492 :     if (i >= instrumentation->num_plan_nodes)
    1259           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1260             : 
    1261             :     /*
    1262             :      * Add our statistics to the per-node, per-worker totals.  It's possible
    1263             :      * that this could happen more than once if we relaunched workers.
    1264             :      */
    1265        1492 :     instrument = GetInstrumentationArray(instrumentation);
    1266        1492 :     instrument += i * instrumentation->num_workers;
    1267             :     Assert(IsParallelWorker());
    1268             :     Assert(ParallelWorkerNumber < instrumentation->num_workers);
    1269        1492 :     InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
    1270             : 
    1271        1492 :     return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
    1272             :                                  instrumentation);
    1273             : }
    1274             : 
    1275             : /*
    1276             :  * Initialize the PlanState and its descendants with the information
    1277             :  * retrieved from shared memory.  This has to be done once the PlanState
    1278             :  * is allocated and initialized by executor; that is, after ExecutorStart().
    1279             :  */
    1280             : static bool
    1281        4758 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
    1282             : {
    1283        4758 :     if (planstate == NULL)
    1284           0 :         return false;
    1285             : 
    1286        4758 :     switch (nodeTag(planstate))
    1287             :     {
    1288        1894 :         case T_SeqScanState:
    1289        1894 :             if (planstate->plan->parallel_aware)
    1290        1498 :                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
    1291        1894 :             break;
    1292         248 :         case T_IndexScanState:
    1293         248 :             if (planstate->plan->parallel_aware)
    1294          64 :                 ExecIndexScanInitializeWorker((IndexScanState *) planstate,
    1295             :                                               pwcxt);
    1296         248 :             break;
    1297         144 :         case T_IndexOnlyScanState:
    1298         144 :             if (planstate->plan->parallel_aware)
    1299         128 :                 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
    1300             :                                                   pwcxt);
    1301         144 :             break;
    1302           0 :         case T_ForeignScanState:
    1303           0 :             if (planstate->plan->parallel_aware)
    1304           0 :                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
    1305             :                                                 pwcxt);
    1306           0 :             break;
    1307         188 :         case T_AppendState:
    1308         188 :             if (planstate->plan->parallel_aware)
    1309         148 :                 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
    1310         188 :             break;
    1311           0 :         case T_CustomScanState:
    1312           0 :             if (planstate->plan->parallel_aware)
    1313           0 :                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
    1314             :                                                pwcxt);
    1315           0 :             break;
    1316         178 :         case T_BitmapHeapScanState:
    1317         178 :             if (planstate->plan->parallel_aware)
    1318         176 :                 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
    1319             :                                                pwcxt);
    1320         178 :             break;
    1321         340 :         case T_HashJoinState:
    1322         340 :             if (planstate->plan->parallel_aware)
    1323         180 :                 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
    1324             :                                              pwcxt);
    1325         340 :             break;
    1326         340 :         case T_HashState:
    1327             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1328         340 :             ExecHashInitializeWorker((HashState *) planstate, pwcxt);
    1329         340 :             break;
    1330         286 :         case T_SortState:
    1331             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1332         286 :             ExecSortInitializeWorker((SortState *) planstate, pwcxt);
    1333         286 :             break;
    1334           0 :         case T_IncrementalSortState:
    1335             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1336           0 :             ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
    1337             :                                                 pwcxt);
    1338           0 :             break;
    1339             : 
    1340        1140 :         default:
    1341        1140 :             break;
    1342             :     }
    1343             : 
    1344        4758 :     return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
    1345             :                                  pwcxt);
    1346             : }
    1347             : 
    1348             : /*
    1349             :  * Main entrypoint for parallel query worker processes.
    1350             :  *
    1351             :  * We reach this function from ParallelWorkerMain, so the setup necessary to
    1352             :  * create a sensible parallel environment has already been done;
    1353             :  * ParallelWorkerMain worries about stuff like the transaction state, combo
    1354             :  * CID mappings, and GUC values, so we don't need to deal with any of that
    1355             :  * here.
    1356             :  *
    1357             :  * Our job is to deal with concerns specific to the executor.  The parallel
    1358             :  * group leader will have stored a serialized PlannedStmt, and it's our job
    1359             :  * to execute that plan and write the resulting tuples to the appropriate
    1360             :  * tuple queue.  Various bits of supporting information that we need in order
    1361             :  * to do this are also stored in the dsm_segment and can be accessed through
    1362             :  * the shm_toc.
    1363             :  */
    1364             : void
    1365        1460 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    1366             : {
    1367             :     FixedParallelExecutorState *fpes;
    1368             :     BufferUsage *buffer_usage;
    1369             :     WalUsage   *wal_usage;
    1370             :     DestReceiver *receiver;
    1371             :     QueryDesc  *queryDesc;
    1372             :     SharedExecutorInstrumentation *instrumentation;
    1373             :     SharedJitInstrumentation *jit_instrumentation;
    1374        1460 :     int         instrument_options = 0;
    1375             :     void       *area_space;
    1376             :     dsa_area   *area;
    1377             :     ParallelWorkerContext pwcxt;
    1378             : 
    1379             :     /* Get fixed-size state. */
    1380        1460 :     fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
    1381             : 
    1382             :     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    1383        1460 :     receiver = ExecParallelGetReceiver(seg, toc);
    1384        1460 :     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
    1385        1460 :     if (instrumentation != NULL)
    1386         468 :         instrument_options = instrumentation->instrument_options;
    1387        1460 :     jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
    1388             :                                          true);
    1389        1460 :     queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
    1390             : 
    1391             :     /* Setting debug_query_string for individual workers */
    1392        1460 :     debug_query_string = queryDesc->sourceText;
    1393             : 
    1394             :     /* Report workers' query for monitoring purposes */
    1395        1460 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1396             : 
    1397             :     /* Attach to the dynamic shared memory area. */
    1398        1460 :     area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
    1399        1460 :     area = dsa_attach_in_place(area_space, seg);
    1400             : 
    1401             :     /* Start up the executor */
    1402        1460 :     queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
    1403        1460 :     ExecutorStart(queryDesc, fpes->eflags);
    1404             : 
    1405             :     /* Special executor initialization steps for parallel workers */
    1406        1460 :     queryDesc->planstate->state->es_query_dsa = area;
    1407        1460 :     if (DsaPointerIsValid(fpes->param_exec))
    1408             :     {
    1409             :         char       *paramexec_space;
    1410             : 
    1411          24 :         paramexec_space = dsa_get_address(area, fpes->param_exec);
    1412          24 :         RestoreParamExecParams(paramexec_space, queryDesc->estate);
    1413             : 
    1414             :     }
    1415        1460 :     pwcxt.toc = toc;
    1416        1460 :     pwcxt.seg = seg;
    1417        1460 :     ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
    1418             : 
    1419             :     /* Pass down any tuple bound */
    1420        1460 :     ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
    1421             : 
    1422             :     /*
    1423             :      * Prepare to track buffer/WAL usage during query execution.
    1424             :      *
    1425             :      * We do this after starting up the executor to match what happens in the
    1426             :      * leader, which also doesn't count buffer accesses and WAL activity that
    1427             :      * occur during executor startup.
    1428             :      */
    1429        1460 :     InstrStartParallelQuery();
    1430             : 
    1431             :     /*
    1432             :      * Run the plan.  If we specified a tuple bound, be careful not to demand
    1433             :      * more tuples than that.
    1434             :      */
    1435        1460 :     ExecutorRun(queryDesc,
    1436             :                 ForwardScanDirection,
    1437        1460 :                 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
    1438             :                 true);
    1439             : 
    1440             :     /* Shut down the executor */
    1441        1456 :     ExecutorFinish(queryDesc);
    1442             : 
    1443             :     /* Report buffer/WAL usage during parallel execution. */
    1444        1456 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
    1445        1456 :     wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
    1446        1456 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
    1447        1456 :                           &wal_usage[ParallelWorkerNumber]);
    1448             : 
    1449             :     /* Report instrumentation data if any instrumentation options are set. */
    1450        1456 :     if (instrumentation != NULL)
    1451         468 :         ExecParallelReportInstrumentation(queryDesc->planstate,
    1452             :                                           instrumentation);
    1453             : 
    1454             :     /* Report JIT instrumentation data if any */
    1455        1456 :     if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
    1456             :     {
    1457             :         Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
    1458          96 :         jit_instrumentation->jit_instr[ParallelWorkerNumber] =
    1459          96 :             queryDesc->estate->es_jit->instr;
    1460             :     }
    1461             : 
    1462             :     /* Must do this after capturing instrumentation. */
    1463        1456 :     ExecutorEnd(queryDesc);
    1464             : 
    1465             :     /* Cleanup. */
    1466        1456 :     dsa_detach(area);
    1467        1456 :     FreeQueryDesc(queryDesc);
    1468        1456 :     receiver->rDestroy(receiver);
    1469        1456 : }

Generated by: LCOV version 1.13