LCOV - code coverage report
Current view: top level - src/backend/executor - execParallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 457 500 91.4 %
Date: 2019-06-18 07:06:57 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execParallel.c
       4             :  *    Support routines for parallel execution.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * This file contains routines that are intended to support setting up,
      10             :  * using, and tearing down a ParallelContext from within the PostgreSQL
      11             :  * executor.  The ParallelContext machinery will handle starting the
      12             :  * workers and ensuring that their state generally matches that of the
      13             :  * leader; see src/backend/access/transam/README.parallel for details.
      14             :  * However, we must save and restore relevant executor state, such as
      15             :  * any ParamListInfo associated with the query, buffer usage info, and
      16             :  * the actual plan to be passed down to the worker.
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *    src/backend/executor/execParallel.c
      20             :  *
      21             :  *-------------------------------------------------------------------------
      22             :  */
      23             : 
      24             : #include "postgres.h"
      25             : 
      26             : #include "executor/execParallel.h"
      27             : #include "executor/executor.h"
      28             : #include "executor/nodeAppend.h"
      29             : #include "executor/nodeBitmapHeapscan.h"
      30             : #include "executor/nodeCustom.h"
      31             : #include "executor/nodeForeignscan.h"
      32             : #include "executor/nodeHash.h"
      33             : #include "executor/nodeHashjoin.h"
      34             : #include "executor/nodeIndexscan.h"
      35             : #include "executor/nodeIndexonlyscan.h"
      36             : #include "executor/nodeSeqscan.h"
      37             : #include "executor/nodeSort.h"
      38             : #include "executor/nodeSubplan.h"
      39             : #include "executor/tqueue.h"
      40             : #include "jit/jit.h"
      41             : #include "nodes/nodeFuncs.h"
      42             : #include "storage/spin.h"
      43             : #include "tcop/tcopprot.h"
      44             : #include "utils/datum.h"
      45             : #include "utils/dsa.h"
      46             : #include "utils/lsyscache.h"
      47             : #include "utils/memutils.h"
      48             : #include "utils/snapmgr.h"
      49             : #include "pgstat.h"
      50             : 
      51             : /*
      52             :  * Magic numbers for parallel executor communication.  We use constants
      53             :  * greater than any 32-bit integer here so that values < 2^32 can be used
      54             :  * by individual parallel nodes to store their own state.
      55             :  */
      56             : #define PARALLEL_KEY_EXECUTOR_FIXED     UINT64CONST(0xE000000000000001)
      57             : #define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
      58             : #define PARALLEL_KEY_PARAMLISTINFO      UINT64CONST(0xE000000000000003)
      59             : #define PARALLEL_KEY_BUFFER_USAGE       UINT64CONST(0xE000000000000004)
      60             : #define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
      61             : #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
      62             : #define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
      63             : #define PARALLEL_KEY_QUERY_TEXT     UINT64CONST(0xE000000000000008)
      64             : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
      65             : 
      66             : #define PARALLEL_TUPLE_QUEUE_SIZE       65536
      67             : 
      68             : /*
      69             :  * Fixed-size random stuff that we need to pass to parallel workers.
      70             :  */
      71             : typedef struct FixedParallelExecutorState
      72             : {
      73             :     int64       tuples_needed;  /* tuple bound, see ExecSetTupleBound */
      74             :     dsa_pointer param_exec;
      75             :     int         eflags;
      76             :     int         jit_flags;
      77             : } FixedParallelExecutorState;
      78             : 
      79             : /*
      80             :  * DSM structure for accumulating per-PlanState instrumentation.
      81             :  *
      82             :  * instrument_options: Same meaning here as in instrument.c.
      83             :  *
      84             :  * instrument_offset: Offset, relative to the start of this structure,
      85             :  * of the first Instrumentation object.  This will depend on the length of
      86             :  * the plan_node_id array.
      87             :  *
      88             :  * num_workers: Number of workers.
      89             :  *
      90             :  * num_plan_nodes: Number of plan nodes.
      91             :  *
      92             :  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
      93             :  * from parallel workers.  The length of this array is given by num_plan_nodes.
      94             :  */
      95             : struct SharedExecutorInstrumentation
      96             : {
      97             :     int         instrument_options;
      98             :     int         instrument_offset;
      99             :     int         num_workers;
     100             :     int         num_plan_nodes;
     101             :     int         plan_node_id[FLEXIBLE_ARRAY_MEMBER];
     102             :     /* array of num_plan_nodes * num_workers Instrumentation objects follows */
     103             : };
     104             : #define GetInstrumentationArray(sei) \
     105             :     (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
     106             :      (Instrumentation *) (((char *) sei) + sei->instrument_offset))
     107             : 
     108             : /* Context object for ExecParallelEstimate. */
     109             : typedef struct ExecParallelEstimateContext
     110             : {
     111             :     ParallelContext *pcxt;
     112             :     int         nnodes;
     113             : } ExecParallelEstimateContext;
     114             : 
     115             : /* Context object for ExecParallelInitializeDSM. */
     116             : typedef struct ExecParallelInitializeDSMContext
     117             : {
     118             :     ParallelContext *pcxt;
     119             :     SharedExecutorInstrumentation *instrumentation;
     120             :     int         nnodes;
     121             : } ExecParallelInitializeDSMContext;
     122             : 
     123             : /* Helper functions that run in the parallel leader. */
     124             : static char *ExecSerializePlan(Plan *plan, EState *estate);
     125             : static bool ExecParallelEstimate(PlanState *node,
     126             :                                  ExecParallelEstimateContext *e);
     127             : static bool ExecParallelInitializeDSM(PlanState *node,
     128             :                                       ExecParallelInitializeDSMContext *d);
     129             : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
     130             :                                                     bool reinitialize);
     131             : static bool ExecParallelReInitializeDSM(PlanState *planstate,
     132             :                                         ParallelContext *pcxt);
     133             : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     134             :                                                 SharedExecutorInstrumentation *instrumentation);
     135             : 
     136             : /* Helper function that runs in the parallel worker. */
     137             : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
     138             : 
     139             : /*
     140             :  * Create a serialized representation of the plan to be sent to each worker.
     141             :  */
     142             : static char *
     143         406 : ExecSerializePlan(Plan *plan, EState *estate)
     144             : {
     145             :     PlannedStmt *pstmt;
     146             :     ListCell   *lc;
     147             : 
     148             :     /* We can't scribble on the original plan, so make a copy. */
     149         406 :     plan = copyObject(plan);
     150             : 
     151             :     /*
     152             :      * The worker will start its own copy of the executor, and that copy will
     153             :      * insert a junk filter if the toplevel node has any resjunk entries. We
     154             :      * don't want that to happen, because while resjunk columns shouldn't be
     155             :      * sent back to the user, here the tuples are coming back to another
     156             :      * backend which may very well need them.  So mutate the target list
     157             :      * accordingly.  This is sort of a hack; there might be better ways to do
     158             :      * this...
     159             :      */
     160        1058 :     foreach(lc, plan->targetlist)
     161             :     {
     162         652 :         TargetEntry *tle = lfirst_node(TargetEntry, lc);
     163             : 
     164         652 :         tle->resjunk = false;
     165             :     }
     166             : 
     167             :     /*
     168             :      * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
     169             :      * for our purposes, but the worker will need at least a minimal
     170             :      * PlannedStmt to start the executor.
     171             :      */
     172         406 :     pstmt = makeNode(PlannedStmt);
     173         406 :     pstmt->commandType = CMD_SELECT;
     174         406 :     pstmt->queryId = UINT64CONST(0);
     175         406 :     pstmt->hasReturning = false;
     176         406 :     pstmt->hasModifyingCTE = false;
     177         406 :     pstmt->canSetTag = true;
     178         406 :     pstmt->transientPlan = false;
     179         406 :     pstmt->dependsOnRole = false;
     180         406 :     pstmt->parallelModeNeeded = false;
     181         406 :     pstmt->planTree = plan;
     182         406 :     pstmt->rtable = estate->es_range_table;
     183         406 :     pstmt->resultRelations = NIL;
     184             : 
     185             :     /*
     186             :      * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
     187             :      * for unsafe ones (so that the list indexes of the safe ones are
     188             :      * preserved).  This positively ensures that the worker won't try to run,
     189             :      * or even do ExecInitNode on, an unsafe subplan.  That's important to
     190             :      * protect, eg, non-parallel-aware FDWs from getting into trouble.
     191             :      */
     192         406 :     pstmt->subplans = NIL;
     193         426 :     foreach(lc, estate->es_plannedstmt->subplans)
     194             :     {
     195          20 :         Plan       *subplan = (Plan *) lfirst(lc);
     196             : 
     197          20 :         if (subplan && !subplan->parallel_safe)
     198          16 :             subplan = NULL;
     199          20 :         pstmt->subplans = lappend(pstmt->subplans, subplan);
     200             :     }
     201             : 
     202         406 :     pstmt->rewindPlanIDs = NULL;
     203         406 :     pstmt->rowMarks = NIL;
     204         406 :     pstmt->relationOids = NIL;
     205         406 :     pstmt->invalItems = NIL; /* workers can't replan anyway... */
     206         406 :     pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
     207         406 :     pstmt->utilityStmt = NULL;
     208         406 :     pstmt->stmt_location = -1;
     209         406 :     pstmt->stmt_len = -1;
     210             : 
     211             :     /* Return serialized copy of our dummy PlannedStmt. */
     212         406 :     return nodeToString(pstmt);
     213             : }
     214             : 
     215             : /*
     216             :  * Parallel-aware plan nodes (and occasionally others) may need some state
     217             :  * which is shared across all parallel workers.  Before we size the DSM, give
     218             :  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
     219             :  * &pcxt->estimator.
     220             :  *
     221             :  * While we're at it, count the number of PlanState nodes in the tree, so
     222             :  * we know how many Instrumentation structures we need.
     223             :  */
     224             : static bool
     225        2098 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
     226             : {
     227        2098 :     if (planstate == NULL)
     228           0 :         return false;
     229             : 
     230             :     /* Count this node. */
     231        2098 :     e->nnodes++;
     232             : 
     233        2098 :     switch (nodeTag(planstate))
     234             :     {
     235             :         case T_SeqScanState:
     236        1004 :             if (planstate->plan->parallel_aware)
     237         860 :                 ExecSeqScanEstimate((SeqScanState *) planstate,
     238             :                                     e->pcxt);
     239        1004 :             break;
     240             :         case T_IndexScanState:
     241         192 :             if (planstate->plan->parallel_aware)
     242           8 :                 ExecIndexScanEstimate((IndexScanState *) planstate,
     243             :                                       e->pcxt);
     244         192 :             break;
     245             :         case T_IndexOnlyScanState:
     246          28 :             if (planstate->plan->parallel_aware)
     247          24 :                 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
     248             :                                           e->pcxt);
     249          28 :             break;
     250             :         case T_ForeignScanState:
     251           0 :             if (planstate->plan->parallel_aware)
     252           0 :                 ExecForeignScanEstimate((ForeignScanState *) planstate,
     253             :                                         e->pcxt);
     254           0 :             break;
     255             :         case T_AppendState:
     256         140 :             if (planstate->plan->parallel_aware)
     257         108 :                 ExecAppendEstimate((AppendState *) planstate,
     258             :                                    e->pcxt);
     259         140 :             break;
     260             :         case T_CustomScanState:
     261           0 :             if (planstate->plan->parallel_aware)
     262           0 :                 ExecCustomScanEstimate((CustomScanState *) planstate,
     263             :                                        e->pcxt);
     264           0 :             break;
     265             :         case T_BitmapHeapScanState:
     266          10 :             if (planstate->plan->parallel_aware)
     267           8 :                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
     268             :                                        e->pcxt);
     269          10 :             break;
     270             :         case T_HashJoinState:
     271         112 :             if (planstate->plan->parallel_aware)
     272          64 :                 ExecHashJoinEstimate((HashJoinState *) planstate,
     273             :                                      e->pcxt);
     274         112 :             break;
     275             :         case T_HashState:
     276             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     277         112 :             ExecHashEstimate((HashState *) planstate, e->pcxt);
     278         112 :             break;
     279             :         case T_SortState:
     280             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     281          86 :             ExecSortEstimate((SortState *) planstate, e->pcxt);
     282          86 :             break;
     283             : 
     284             :         default:
     285         414 :             break;
     286             :     }
     287             : 
     288        2098 :     return planstate_tree_walker(planstate, ExecParallelEstimate, e);
     289             : }
     290             : 
     291             : /*
     292             :  * Estimate the amount of space required to serialize the indicated parameters.
     293             :  */
     294             : static Size
     295           8 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
     296             : {
     297             :     int         paramid;
     298           8 :     Size        sz = sizeof(int);
     299             : 
     300           8 :     paramid = -1;
     301          28 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     302             :     {
     303             :         Oid         typeOid;
     304             :         int16       typLen;
     305             :         bool        typByVal;
     306             :         ParamExecData *prm;
     307             : 
     308          12 :         prm = &(estate->es_param_exec_vals[paramid]);
     309          12 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     310             :                                paramid);
     311             : 
     312          12 :         sz = add_size(sz, sizeof(int)); /* space for paramid */
     313             : 
     314             :         /* space for datum/isnull */
     315          12 :         if (OidIsValid(typeOid))
     316          12 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     317             :         else
     318             :         {
     319             :             /* If no type OID, assume by-value, like copyParamList does. */
     320           0 :             typLen = sizeof(Datum);
     321           0 :             typByVal = true;
     322             :         }
     323          24 :         sz = add_size(sz,
     324          12 :                       datumEstimateSpace(prm->value, prm->isnull,
     325             :                                          typByVal, typLen));
     326             :     }
     327           8 :     return sz;
     328             : }
     329             : 
     330             : /*
     331             :  * Serialize specified PARAM_EXEC parameters.
     332             :  *
     333             :  * We write the number of parameters first, as a 4-byte integer, and then
     334             :  * write details for each parameter in turn.  The details for each parameter
     335             :  * consist of a 4-byte paramid (location of param in execution time internal
     336             :  * parameter array) and then the datum as serialized by datumSerialize().
     337             :  */
     338             : static dsa_pointer
     339           8 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
     340             : {
     341             :     Size        size;
     342             :     int         nparams;
     343             :     int         paramid;
     344             :     ParamExecData *prm;
     345             :     dsa_pointer handle;
     346             :     char       *start_address;
     347             : 
     348             :     /* Allocate enough space for the current parameter values. */
     349           8 :     size = EstimateParamExecSpace(estate, params);
     350           8 :     handle = dsa_allocate(area, size);
     351           8 :     start_address = dsa_get_address(area, handle);
     352             : 
     353             :     /* First write the number of parameters as a 4-byte integer. */
     354           8 :     nparams = bms_num_members(params);
     355           8 :     memcpy(start_address, &nparams, sizeof(int));
     356           8 :     start_address += sizeof(int);
     357             : 
     358             :     /* Write details for each parameter in turn. */
     359           8 :     paramid = -1;
     360          28 :     while ((paramid = bms_next_member(params, paramid)) >= 0)
     361             :     {
     362             :         Oid         typeOid;
     363             :         int16       typLen;
     364             :         bool        typByVal;
     365             : 
     366          12 :         prm = &(estate->es_param_exec_vals[paramid]);
     367          12 :         typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
     368             :                                paramid);
     369             : 
     370             :         /* Write paramid. */
     371          12 :         memcpy(start_address, &paramid, sizeof(int));
     372          12 :         start_address += sizeof(int);
     373             : 
     374             :         /* Write datum/isnull */
     375          12 :         if (OidIsValid(typeOid))
     376          12 :             get_typlenbyval(typeOid, &typLen, &typByVal);
     377             :         else
     378             :         {
     379             :             /* If no type OID, assume by-value, like copyParamList does. */
     380           0 :             typLen = sizeof(Datum);
     381           0 :             typByVal = true;
     382             :         }
     383          12 :         datumSerialize(prm->value, prm->isnull, typByVal, typLen,
     384             :                        &start_address);
     385             :     }
     386             : 
     387           8 :     return handle;
     388             : }
     389             : 
     390             : /*
     391             :  * Restore specified PARAM_EXEC parameters.
     392             :  */
     393             : static void
     394          24 : RestoreParamExecParams(char *start_address, EState *estate)
     395             : {
     396             :     int         nparams;
     397             :     int         i;
     398             :     int         paramid;
     399             : 
     400          24 :     memcpy(&nparams, start_address, sizeof(int));
     401          24 :     start_address += sizeof(int);
     402             : 
     403          56 :     for (i = 0; i < nparams; i++)
     404             :     {
     405             :         ParamExecData *prm;
     406             : 
     407             :         /* Read paramid */
     408          32 :         memcpy(&paramid, start_address, sizeof(int));
     409          32 :         start_address += sizeof(int);
     410          32 :         prm = &(estate->es_param_exec_vals[paramid]);
     411             : 
     412             :         /* Read datum/isnull. */
     413          32 :         prm->value = datumRestore(&start_address, &prm->isnull);
     414          32 :         prm->execPlan = NULL;
     415             :     }
     416          24 : }
     417             : 
     418             : /*
     419             :  * Initialize the dynamic shared memory segment that will be used to control
     420             :  * parallel execution.
     421             :  */
     422             : static bool
     423        2098 : ExecParallelInitializeDSM(PlanState *planstate,
     424             :                           ExecParallelInitializeDSMContext *d)
     425             : {
     426        2098 :     if (planstate == NULL)
     427           0 :         return false;
     428             : 
     429             :     /* If instrumentation is enabled, initialize slot for this node. */
     430        2098 :     if (d->instrumentation != NULL)
     431        1272 :         d->instrumentation->plan_node_id[d->nnodes] =
     432         636 :             planstate->plan->plan_node_id;
     433             : 
     434             :     /* Count this node. */
     435        2098 :     d->nnodes++;
     436             : 
     437             :     /*
     438             :      * Call initializers for DSM-using plan nodes.
     439             :      *
     440             :      * Most plan nodes won't do anything here, but plan nodes that allocated
     441             :      * DSM may need to initialize shared state in the DSM before parallel
     442             :      * workers are launched.  They can allocate the space they previously
     443             :      * estimated using shm_toc_allocate, and add the keys they previously
     444             :      * estimated using shm_toc_insert, in each case targeting pcxt->toc.
     445             :      */
     446        2098 :     switch (nodeTag(planstate))
     447             :     {
     448             :         case T_SeqScanState:
     449        1004 :             if (planstate->plan->parallel_aware)
     450         860 :                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
     451             :                                          d->pcxt);
     452        1004 :             break;
     453             :         case T_IndexScanState:
     454         192 :             if (planstate->plan->parallel_aware)
     455           8 :                 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
     456             :                                            d->pcxt);
     457         192 :             break;
     458             :         case T_IndexOnlyScanState:
     459          28 :             if (planstate->plan->parallel_aware)
     460          24 :                 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
     461             :                                                d->pcxt);
     462          28 :             break;
     463             :         case T_ForeignScanState:
     464           0 :             if (planstate->plan->parallel_aware)
     465           0 :                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
     466             :                                              d->pcxt);
     467           0 :             break;
     468             :         case T_AppendState:
     469         140 :             if (planstate->plan->parallel_aware)
     470         108 :                 ExecAppendInitializeDSM((AppendState *) planstate,
     471             :                                         d->pcxt);
     472         140 :             break;
     473             :         case T_CustomScanState:
     474           0 :             if (planstate->plan->parallel_aware)
     475           0 :                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
     476             :                                             d->pcxt);
     477           0 :             break;
     478             :         case T_BitmapHeapScanState:
     479          10 :             if (planstate->plan->parallel_aware)
     480           8 :                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
     481             :                                             d->pcxt);
     482          10 :             break;
     483             :         case T_HashJoinState:
     484         112 :             if (planstate->plan->parallel_aware)
     485          64 :                 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
     486             :                                           d->pcxt);
     487         112 :             break;
     488             :         case T_HashState:
     489             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     490         112 :             ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
     491         112 :             break;
     492             :         case T_SortState:
     493             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
     494          86 :             ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
     495          86 :             break;
     496             : 
     497             :         default:
     498         414 :             break;
     499             :     }
     500             : 
     501        2098 :     return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
     502             : }
     503             : 
     504             : /*
     505             :  * It sets up the response queues for backend workers to return tuples
     506             :  * to the main backend and start the workers.
     507             :  */
     508             : static shm_mq_handle **
     509         594 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
     510             : {
     511             :     shm_mq_handle **responseq;
     512             :     char       *tqueuespace;
     513             :     int         i;
     514             : 
     515             :     /* Skip this if no workers. */
     516         594 :     if (pcxt->nworkers == 0)
     517           0 :         return NULL;
     518             : 
     519             :     /* Allocate memory for shared memory queue handles. */
     520         594 :     responseq = (shm_mq_handle **)
     521         594 :         palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
     522             : 
     523             :     /*
     524             :      * If not reinitializing, allocate space from the DSM for the queues;
     525             :      * otherwise, find the already allocated space.
     526             :      */
     527         594 :     if (!reinitialize)
     528         406 :         tqueuespace =
     529         406 :             shm_toc_allocate(pcxt->toc,
     530             :                              mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
     531         406 :                                       pcxt->nworkers));
     532             :     else
     533         188 :         tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
     534             : 
     535             :     /* Create the queues, and become the receiver for each. */
     536        2200 :     for (i = 0; i < pcxt->nworkers; ++i)
     537             :     {
     538             :         shm_mq     *mq;
     539             : 
     540        1606 :         mq = shm_mq_create(tqueuespace +
     541        1606 :                            ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
     542             :                            (Size) PARALLEL_TUPLE_QUEUE_SIZE);
     543             : 
     544        1606 :         shm_mq_set_receiver(mq, MyProc);
     545        1606 :         responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
     546             :     }
     547             : 
     548             :     /* Add array of queues to shm_toc, so others can find it. */
     549         594 :     if (!reinitialize)
     550         406 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
     551             : 
     552             :     /* Return array of handles. */
     553         594 :     return responseq;
     554             : }
     555             : 
     556             : /*
     557             :  * Sets up the required infrastructure for backend workers to perform
     558             :  * execution and return results to the main backend.
     559             :  */
     560             : ParallelExecutorInfo *
     561         406 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
     562             :                      Bitmapset *sendParams, int nworkers,
     563             :                      int64 tuples_needed)
     564             : {
     565             :     ParallelExecutorInfo *pei;
     566             :     ParallelContext *pcxt;
     567             :     ExecParallelEstimateContext e;
     568             :     ExecParallelInitializeDSMContext d;
     569             :     FixedParallelExecutorState *fpes;
     570             :     char       *pstmt_data;
     571             :     char       *pstmt_space;
     572             :     char       *paramlistinfo_space;
     573             :     BufferUsage *bufusage_space;
     574         406 :     SharedExecutorInstrumentation *instrumentation = NULL;
     575         406 :     SharedJitInstrumentation *jit_instrumentation = NULL;
     576             :     int         pstmt_len;
     577             :     int         paramlistinfo_len;
     578         406 :     int         instrumentation_len = 0;
     579         406 :     int         jit_instrumentation_len = 0;
     580         406 :     int         instrument_offset = 0;
     581         406 :     Size        dsa_minsize = dsa_minimum_size();
     582             :     char       *query_string;
     583             :     int         query_len;
     584             : 
     585             :     /*
     586             :      * Force any initplan outputs that we're going to pass to workers to be
     587             :      * evaluated, if they weren't already.
     588             :      *
     589             :      * For simplicity, we use the EState's per-output-tuple ExprContext here.
     590             :      * That risks intra-query memory leakage, since we might pass through here
     591             :      * many times before that ExprContext gets reset; but ExecSetParamPlan
     592             :      * doesn't normally leak any memory in the context (see its comments), so
     593             :      * it doesn't seem worth complicating this function's API to pass it a
     594             :      * shorter-lived ExprContext.  This might need to change someday.
     595             :      */
     596         406 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     597             : 
     598             :     /* Allocate object for return value. */
     599         406 :     pei = palloc0(sizeof(ParallelExecutorInfo));
     600         406 :     pei->finished = false;
     601         406 :     pei->planstate = planstate;
     602             : 
     603             :     /* Fix up and serialize plan to be sent to workers. */
     604         406 :     pstmt_data = ExecSerializePlan(planstate->plan, estate);
     605             : 
     606             :     /* Create a parallel context. */
     607         406 :     pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
     608         406 :     pei->pcxt = pcxt;
     609             : 
     610             :     /*
     611             :      * Before telling the parallel context to create a dynamic shared memory
     612             :      * segment, we need to figure out how big it should be.  Estimate space
     613             :      * for the various things we need to store.
     614             :      */
     615             : 
     616             :     /* Estimate space for fixed-size state. */
     617         406 :     shm_toc_estimate_chunk(&pcxt->estimator,
     618             :                            sizeof(FixedParallelExecutorState));
     619         406 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     620             : 
     621             :     /* Estimate space for query text. */
     622         406 :     query_len = strlen(estate->es_sourceText);
     623         406 :     shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
     624         406 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     625             : 
     626             :     /* Estimate space for serialized PlannedStmt. */
     627         406 :     pstmt_len = strlen(pstmt_data) + 1;
     628         406 :     shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
     629         406 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     630             : 
     631             :     /* Estimate space for serialized ParamListInfo. */
     632         406 :     paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
     633         406 :     shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
     634         406 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     635             : 
     636             :     /*
     637             :      * Estimate space for BufferUsage.
     638             :      *
     639             :      * If EXPLAIN is not in use and there are no extensions loaded that care,
     640             :      * we could skip this.  But we have no way of knowing whether anyone's
     641             :      * looking at pgBufferUsage, so do it unconditionally.
     642             :      */
     643         406 :     shm_toc_estimate_chunk(&pcxt->estimator,
     644             :                            mul_size(sizeof(BufferUsage), pcxt->nworkers));
     645         406 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     646             : 
     647             :     /* Estimate space for tuple queues. */
     648         406 :     shm_toc_estimate_chunk(&pcxt->estimator,
     649             :                            mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
     650         406 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     651             : 
     652             :     /*
     653             :      * Give parallel-aware nodes a chance to add to the estimates, and get a
     654             :      * count of how many PlanState nodes there are.
     655             :      */
     656         406 :     e.pcxt = pcxt;
     657         406 :     e.nnodes = 0;
     658         406 :     ExecParallelEstimate(planstate, &e);
     659             : 
     660             :     /* Estimate space for instrumentation, if required. */
     661         406 :     if (estate->es_instrument)
     662             :     {
     663         108 :         instrumentation_len =
     664             :             offsetof(SharedExecutorInstrumentation, plan_node_id) +
     665         108 :             sizeof(int) * e.nnodes;
     666         108 :         instrumentation_len = MAXALIGN(instrumentation_len);
     667         108 :         instrument_offset = instrumentation_len;
     668         108 :         instrumentation_len +=
     669         216 :             mul_size(sizeof(Instrumentation),
     670         108 :                      mul_size(e.nnodes, nworkers));
     671         108 :         shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
     672         108 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     673             : 
     674             :         /* Estimate space for JIT instrumentation, if required. */
     675         108 :         if (estate->es_jit_flags != PGJIT_NONE)
     676             :         {
     677          16 :             jit_instrumentation_len =
     678          16 :                 offsetof(SharedJitInstrumentation, jit_instr) +
     679             :                 sizeof(JitInstrumentation) * nworkers;
     680          16 :             shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
     681          16 :             shm_toc_estimate_keys(&pcxt->estimator, 1);
     682             :         }
     683             :     }
     684             : 
     685             :     /* Estimate space for DSA area. */
     686         406 :     shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
     687         406 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     688             : 
     689             :     /* Everyone's had a chance to ask for space, so now create the DSM. */
     690         406 :     InitializeParallelDSM(pcxt);
     691             : 
     692             :     /*
     693             :      * OK, now we have a dynamic shared memory segment, and it should be big
     694             :      * enough to store all of the data we estimated we would want to put into
     695             :      * it, plus whatever general stuff (not specifically executor-related) the
     696             :      * ParallelContext itself needs to store there.  None of the space we
     697             :      * asked for has been allocated or initialized yet, though, so do that.
     698             :      */
     699             : 
     700             :     /* Store fixed-size state. */
     701         406 :     fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
     702         406 :     fpes->tuples_needed = tuples_needed;
     703         406 :     fpes->param_exec = InvalidDsaPointer;
     704         406 :     fpes->eflags = estate->es_top_eflags;
     705         406 :     fpes->jit_flags = estate->es_jit_flags;
     706         406 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
     707             : 
     708             :     /* Store query string */
     709         406 :     query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
     710         406 :     memcpy(query_string, estate->es_sourceText, query_len + 1);
     711         406 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
     712             : 
     713             :     /* Store serialized PlannedStmt. */
     714         406 :     pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
     715         406 :     memcpy(pstmt_space, pstmt_data, pstmt_len);
     716         406 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
     717             : 
     718             :     /* Store serialized ParamListInfo. */
     719         406 :     paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
     720         406 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
     721         406 :     SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
     722             : 
     723             :     /* Allocate space for each worker's BufferUsage; no need to initialize. */
     724         406 :     bufusage_space = shm_toc_allocate(pcxt->toc,
     725         406 :                                       mul_size(sizeof(BufferUsage), pcxt->nworkers));
     726         406 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
     727         406 :     pei->buffer_usage = bufusage_space;
     728             : 
     729             :     /* Set up the tuple queues that the workers will write into. */
     730         406 :     pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     731             : 
     732             :     /* We don't need the TupleQueueReaders yet, though. */
     733         406 :     pei->reader = NULL;
     734             : 
     735             :     /*
     736             :      * If instrumentation options were supplied, allocate space for the data.
     737             :      * It only gets partially initialized here; the rest happens during
     738             :      * ExecParallelInitializeDSM.
     739             :      */
     740         406 :     if (estate->es_instrument)
     741             :     {
     742             :         Instrumentation *instrument;
     743             :         int         i;
     744             : 
     745         108 :         instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
     746         108 :         instrumentation->instrument_options = estate->es_instrument;
     747         108 :         instrumentation->instrument_offset = instrument_offset;
     748         108 :         instrumentation->num_workers = nworkers;
     749         108 :         instrumentation->num_plan_nodes = e.nnodes;
     750         108 :         instrument = GetInstrumentationArray(instrumentation);
     751        1112 :         for (i = 0; i < nworkers * e.nnodes; ++i)
     752        1004 :             InstrInit(&instrument[i], estate->es_instrument);
     753         108 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
     754             :                        instrumentation);
     755         108 :         pei->instrumentation = instrumentation;
     756             : 
     757         108 :         if (estate->es_jit_flags != PGJIT_NONE)
     758             :         {
     759          16 :             jit_instrumentation = shm_toc_allocate(pcxt->toc,
     760             :                                                    jit_instrumentation_len);
     761          16 :             jit_instrumentation->num_workers = nworkers;
     762          16 :             memset(jit_instrumentation->jit_instr, 0,
     763             :                    sizeof(JitInstrumentation) * nworkers);
     764          16 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
     765             :                            jit_instrumentation);
     766          16 :             pei->jit_instrumentation = jit_instrumentation;
     767             :         }
     768             :     }
     769             : 
     770             :     /*
     771             :      * Create a DSA area that can be used by the leader and all workers.
     772             :      * (However, if we failed to create a DSM and are using private memory
     773             :      * instead, then skip this.)
     774             :      */
     775         406 :     if (pcxt->seg != NULL)
     776             :     {
     777             :         char       *area_space;
     778             : 
     779         406 :         area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
     780         406 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
     781         406 :         pei->area = dsa_create_in_place(area_space, dsa_minsize,
     782             :                                         LWTRANCHE_PARALLEL_QUERY_DSA,
     783             :                                         pcxt->seg);
     784             : 
     785             :         /*
     786             :          * Serialize parameters, if any, using DSA storage.  We don't dare use
     787             :          * the main parallel query DSM for this because we might relaunch
     788             :          * workers after the values have changed (and thus the amount of
     789             :          * storage required has changed).
     790             :          */
     791         406 :         if (!bms_is_empty(sendParams))
     792             :         {
     793           8 :             pei->param_exec = SerializeParamExecParams(estate, sendParams,
     794             :                                                        pei->area);
     795           8 :             fpes->param_exec = pei->param_exec;
     796             :         }
     797             :     }
     798             : 
     799             :     /*
     800             :      * Give parallel-aware nodes a chance to initialize their shared data.
     801             :      * This also initializes the elements of instrumentation->ps_instrument,
     802             :      * if it exists.
     803             :      */
     804         406 :     d.pcxt = pcxt;
     805         406 :     d.instrumentation = instrumentation;
     806         406 :     d.nnodes = 0;
     807             : 
     808             :     /* Install our DSA area while initializing the plan. */
     809         406 :     estate->es_query_dsa = pei->area;
     810         406 :     ExecParallelInitializeDSM(planstate, &d);
     811         406 :     estate->es_query_dsa = NULL;
     812             : 
     813             :     /*
     814             :      * Make sure that the world hasn't shifted under our feet.  This could
     815             :      * probably just be an Assert(), but let's be conservative for now.
     816             :      */
     817         406 :     if (e.nnodes != d.nnodes)
     818           0 :         elog(ERROR, "inconsistent count of PlanState nodes");
     819             : 
     820             :     /* OK, we're ready to rock and roll. */
     821         406 :     return pei;
     822             : }
     823             : 
     824             : /*
     825             :  * Set up tuple queue readers to read the results of a parallel subplan.
     826             :  *
     827             :  * This is separate from ExecInitParallelPlan() because we can launch the
     828             :  * worker processes and let them start doing something before we do this.
     829             :  */
     830             : void
     831         582 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
     832             : {
     833         582 :     int         nworkers = pei->pcxt->nworkers_launched;
     834             :     int         i;
     835             : 
     836             :     Assert(pei->reader == NULL);
     837             : 
     838         582 :     if (nworkers > 0)
     839             :     {
     840         582 :         pei->reader = (TupleQueueReader **)
     841         582 :             palloc(nworkers * sizeof(TupleQueueReader *));
     842             : 
     843        2140 :         for (i = 0; i < nworkers; i++)
     844             :         {
     845        1558 :             shm_mq_set_handle(pei->tqueue[i],
     846        1558 :                               pei->pcxt->worker[i].bgwhandle);
     847        1558 :             pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
     848             :         }
     849             :     }
     850         582 : }
     851             : 
     852             : /*
     853             :  * Re-initialize the parallel executor shared memory state before launching
     854             :  * a fresh batch of workers.
     855             :  */
     856             : void
     857         188 : ExecParallelReinitialize(PlanState *planstate,
     858             :                          ParallelExecutorInfo *pei,
     859             :                          Bitmapset *sendParams)
     860             : {
     861         188 :     EState     *estate = planstate->state;
     862             :     FixedParallelExecutorState *fpes;
     863             : 
     864             :     /* Old workers must already be shut down */
     865             :     Assert(pei->finished);
     866             : 
     867             :     /*
     868             :      * Force any initplan outputs that we're going to pass to workers to be
     869             :      * evaluated, if they weren't already (see comments in
     870             :      * ExecInitParallelPlan).
     871             :      */
     872         188 :     ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
     873             : 
     874         188 :     ReinitializeParallelDSM(pei->pcxt);
     875         188 :     pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
     876         188 :     pei->reader = NULL;
     877         188 :     pei->finished = false;
     878             : 
     879         188 :     fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
     880             : 
     881             :     /* Free any serialized parameters from the last round. */
     882         188 :     if (DsaPointerIsValid(fpes->param_exec))
     883             :     {
     884           0 :         dsa_free(pei->area, fpes->param_exec);
     885           0 :         fpes->param_exec = InvalidDsaPointer;
     886             :     }
     887             : 
     888             :     /* Serialize current parameter values if required. */
     889         188 :     if (!bms_is_empty(sendParams))
     890             :     {
     891           0 :         pei->param_exec = SerializeParamExecParams(estate, sendParams,
     892             :                                                    pei->area);
     893           0 :         fpes->param_exec = pei->param_exec;
     894             :     }
     895             : 
     896             :     /* Traverse plan tree and let each child node reset associated state. */
     897         188 :     estate->es_query_dsa = pei->area;
     898         188 :     ExecParallelReInitializeDSM(planstate, pei->pcxt);
     899         188 :     estate->es_query_dsa = NULL;
     900         188 : }
     901             : 
     902             : /*
     903             :  * Traverse plan tree to reinitialize per-node dynamic shared memory state
     904             :  */
     905             : static bool
     906         456 : ExecParallelReInitializeDSM(PlanState *planstate,
     907             :                             ParallelContext *pcxt)
     908             : {
     909         456 :     if (planstate == NULL)
     910           0 :         return false;
     911             : 
     912             :     /*
     913             :      * Call reinitializers for DSM-using plan nodes.
     914             :      */
     915         456 :     switch (nodeTag(planstate))
     916             :     {
     917             :         case T_SeqScanState:
     918         200 :             if (planstate->plan->parallel_aware)
     919         168 :                 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
     920             :                                            pcxt);
     921         200 :             break;
     922             :         case T_IndexScanState:
     923           8 :             if (planstate->plan->parallel_aware)
     924           8 :                 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
     925             :                                              pcxt);
     926           8 :             break;
     927             :         case T_IndexOnlyScanState:
     928           8 :             if (planstate->plan->parallel_aware)
     929           8 :                 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
     930             :                                                  pcxt);
     931           8 :             break;
     932             :         case T_ForeignScanState:
     933           0 :             if (planstate->plan->parallel_aware)
     934           0 :                 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
     935             :                                                pcxt);
     936           0 :             break;
     937             :         case T_AppendState:
     938           0 :             if (planstate->plan->parallel_aware)
     939           0 :                 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
     940           0 :             break;
     941             :         case T_CustomScanState:
     942           0 :             if (planstate->plan->parallel_aware)
     943           0 :                 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
     944             :                                               pcxt);
     945           0 :             break;
     946             :         case T_BitmapHeapScanState:
     947          36 :             if (planstate->plan->parallel_aware)
     948          36 :                 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
     949             :                                               pcxt);
     950          36 :             break;
     951             :         case T_HashJoinState:
     952          64 :             if (planstate->plan->parallel_aware)
     953          32 :                 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
     954             :                                             pcxt);
     955          64 :             break;
     956             :         case T_HashState:
     957             :         case T_SortState:
     958             :             /* these nodes have DSM state, but no reinitialization is required */
     959          80 :             break;
     960             : 
     961             :         default:
     962          60 :             break;
     963             :     }
     964             : 
     965         456 :     return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
     966             : }
     967             : 
     968             : /*
     969             :  * Copy instrumentation information about this node and its descendants from
     970             :  * dynamic shared memory.
     971             :  */
     972             : static bool
     973         636 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
     974             :                                     SharedExecutorInstrumentation *instrumentation)
     975             : {
     976             :     Instrumentation *instrument;
     977             :     int         i;
     978             :     int         n;
     979             :     int         ibytes;
     980         636 :     int         plan_node_id = planstate->plan->plan_node_id;
     981             :     MemoryContext oldcontext;
     982             : 
     983             :     /* Find the instrumentation for this node. */
     984        2908 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
     985        2908 :         if (instrumentation->plan_node_id[i] == plan_node_id)
     986         636 :             break;
     987         636 :     if (i >= instrumentation->num_plan_nodes)
     988           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
     989             : 
     990             :     /* Accumulate the statistics from all workers. */
     991         636 :     instrument = GetInstrumentationArray(instrumentation);
     992         636 :     instrument += i * instrumentation->num_workers;
     993        1640 :     for (n = 0; n < instrumentation->num_workers; ++n)
     994        1004 :         InstrAggNode(planstate->instrument, &instrument[n]);
     995             : 
     996             :     /*
     997             :      * Also store the per-worker detail.
     998             :      *
     999             :      * Worker instrumentation should be allocated in the same context as the
    1000             :      * regular instrumentation information, which is the per-query context.
    1001             :      * Switch into per-query memory context.
    1002             :      */
    1003         636 :     oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
    1004         636 :     ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
    1005         636 :     planstate->worker_instrument =
    1006         636 :         palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
    1007         636 :     MemoryContextSwitchTo(oldcontext);
    1008             : 
    1009         636 :     planstate->worker_instrument->num_workers = instrumentation->num_workers;
    1010         636 :     memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
    1011             : 
    1012             :     /* Perform any node-type-specific work that needs to be done. */
    1013         636 :     switch (nodeTag(planstate))
    1014             :     {
    1015             :         case T_SortState:
    1016           4 :             ExecSortRetrieveInstrumentation((SortState *) planstate);
    1017           4 :             break;
    1018             :         case T_HashState:
    1019          56 :             ExecHashRetrieveInstrumentation((HashState *) planstate);
    1020          56 :             break;
    1021             :         default:
    1022         576 :             break;
    1023             :     }
    1024             : 
    1025         636 :     return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
    1026             :                                  instrumentation);
    1027             : }
    1028             : 
    1029             : /*
    1030             :  * Add up the workers' JIT instrumentation from dynamic shared memory.
    1031             :  */
    1032             : static void
    1033          16 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
    1034             :                                        SharedJitInstrumentation *shared_jit)
    1035             : {
    1036             :     JitInstrumentation *combined;
    1037             :     int         ibytes;
    1038             : 
    1039             :     int         n;
    1040             : 
    1041             :     /*
    1042             :      * Accumulate worker JIT instrumentation into the combined JIT
    1043             :      * instrumentation, allocating it if required.
    1044             :      */
    1045          16 :     if (!planstate->state->es_jit_worker_instr)
    1046          32 :         planstate->state->es_jit_worker_instr =
    1047          16 :             MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
    1048          16 :     combined = planstate->state->es_jit_worker_instr;
    1049             : 
    1050             :     /* Accumulate all the workers' instrumentations. */
    1051          48 :     for (n = 0; n < shared_jit->num_workers; ++n)
    1052          32 :         InstrJitAgg(combined, &shared_jit->jit_instr[n]);
    1053             : 
    1054             :     /*
    1055             :      * Store the per-worker detail.
    1056             :      *
    1057             :      * Similar to ExecParallelRetrieveInstrumentation(), allocate the
    1058             :      * instrumentation in per-query context.
    1059             :      */
    1060          16 :     ibytes = offsetof(SharedJitInstrumentation, jit_instr)
    1061          16 :         + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
    1062          16 :     planstate->worker_jit_instrument =
    1063          16 :         MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
    1064             : 
    1065          16 :     memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
    1066          16 : }
    1067             : 
    1068             : /*
    1069             :  * Finish parallel execution.  We wait for parallel workers to finish, and
    1070             :  * accumulate their buffer usage.
    1071             :  */
    1072             : void
    1073        1068 : ExecParallelFinish(ParallelExecutorInfo *pei)
    1074             : {
    1075        1068 :     int         nworkers = pei->pcxt->nworkers_launched;
    1076             :     int         i;
    1077             : 
    1078             :     /* Make this be a no-op if called twice in a row. */
    1079        1068 :     if (pei->finished)
    1080         478 :         return;
    1081             : 
    1082             :     /*
    1083             :      * Detach from tuple queues ASAP, so that any still-active workers will
    1084             :      * notice that no further results are wanted.
    1085             :      */
    1086         590 :     if (pei->tqueue != NULL)
    1087             :     {
    1088        2144 :         for (i = 0; i < nworkers; i++)
    1089        1554 :             shm_mq_detach(pei->tqueue[i]);
    1090         590 :         pfree(pei->tqueue);
    1091         590 :         pei->tqueue = NULL;
    1092             :     }
    1093             : 
    1094             :     /*
    1095             :      * While we're waiting for the workers to finish, let's get rid of the
    1096             :      * tuple queue readers.  (Any other local cleanup could be done here too.)
    1097             :      */
    1098         590 :     if (pei->reader != NULL)
    1099             :     {
    1100        2132 :         for (i = 0; i < nworkers; i++)
    1101        1554 :             DestroyTupleQueueReader(pei->reader[i]);
    1102         578 :         pfree(pei->reader);
    1103         578 :         pei->reader = NULL;
    1104             :     }
    1105             : 
    1106             :     /* Now wait for the workers to finish. */
    1107         590 :     WaitForParallelWorkersToFinish(pei->pcxt);
    1108             : 
    1109             :     /*
    1110             :      * Next, accumulate buffer usage.  (This must wait for the workers to
    1111             :      * finish, or we might get incomplete data.)
    1112             :      */
    1113        2144 :     for (i = 0; i < nworkers; i++)
    1114        1554 :         InstrAccumParallelQuery(&pei->buffer_usage[i]);
    1115             : 
    1116         590 :     pei->finished = true;
    1117             : }
    1118             : 
    1119             : /*
    1120             :  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
    1121             :  * resources still exist after ExecParallelFinish.  We separate these
    1122             :  * routines because someone might want to examine the contents of the DSM
    1123             :  * after ExecParallelFinish and before calling this routine.
    1124             :  */
    1125             : void
    1126         402 : ExecParallelCleanup(ParallelExecutorInfo *pei)
    1127             : {
    1128             :     /* Accumulate instrumentation, if any. */
    1129         402 :     if (pei->instrumentation)
    1130         108 :         ExecParallelRetrieveInstrumentation(pei->planstate,
    1131             :                                             pei->instrumentation);
    1132             : 
    1133             :     /* Accumulate JIT instrumentation, if any. */
    1134         402 :     if (pei->jit_instrumentation)
    1135          16 :         ExecParallelRetrieveJitInstrumentation(pei->planstate,
    1136          16 :                                                pei->jit_instrumentation);
    1137             : 
    1138             :     /* Free any serialized parameters. */
    1139         402 :     if (DsaPointerIsValid(pei->param_exec))
    1140             :     {
    1141           8 :         dsa_free(pei->area, pei->param_exec);
    1142           8 :         pei->param_exec = InvalidDsaPointer;
    1143             :     }
    1144         402 :     if (pei->area != NULL)
    1145             :     {
    1146         402 :         dsa_detach(pei->area);
    1147         402 :         pei->area = NULL;
    1148             :     }
    1149         402 :     if (pei->pcxt != NULL)
    1150             :     {
    1151         402 :         DestroyParallelContext(pei->pcxt);
    1152         402 :         pei->pcxt = NULL;
    1153             :     }
    1154         402 :     pfree(pei);
    1155         402 : }
    1156             : 
    1157             : /*
    1158             :  * Create a DestReceiver to write tuples we produce to the shm_mq designated
    1159             :  * for that purpose.
    1160             :  */
    1161             : static DestReceiver *
    1162        1558 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
    1163             : {
    1164             :     char       *mqspace;
    1165             :     shm_mq     *mq;
    1166             : 
    1167        1558 :     mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
    1168        1558 :     mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
    1169        1558 :     mq = (shm_mq *) mqspace;
    1170        1558 :     shm_mq_set_sender(mq, MyProc);
    1171        1558 :     return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
    1172             : }
    1173             : 
    1174             : /*
    1175             :  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
    1176             :  */
    1177             : static QueryDesc *
    1178        1558 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
    1179             :                          int instrument_options)
    1180             : {
    1181             :     char       *pstmtspace;
    1182             :     char       *paramspace;
    1183             :     PlannedStmt *pstmt;
    1184             :     ParamListInfo paramLI;
    1185             :     char       *queryString;
    1186             : 
    1187             :     /* Get the query string from shared memory */
    1188        1558 :     queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
    1189             : 
    1190             :     /* Reconstruct leader-supplied PlannedStmt. */
    1191        1558 :     pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
    1192        1558 :     pstmt = (PlannedStmt *) stringToNode(pstmtspace);
    1193             : 
    1194             :     /* Reconstruct ParamListInfo. */
    1195        1558 :     paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
    1196        1558 :     paramLI = RestoreParamList(&paramspace);
    1197             : 
    1198             :     /* Create a QueryDesc for the query. */
    1199        1558 :     return CreateQueryDesc(pstmt,
    1200             :                            queryString,
    1201             :                            GetActiveSnapshot(), InvalidSnapshot,
    1202             :                            receiver, paramLI, NULL, instrument_options);
    1203             : }
    1204             : 
    1205             : /*
    1206             :  * Copy instrumentation information from this node and its descendants into
    1207             :  * dynamic shared memory, so that the parallel leader can retrieve it.
    1208             :  */
    1209             : static bool
    1210        1468 : ExecParallelReportInstrumentation(PlanState *planstate,
    1211             :                                   SharedExecutorInstrumentation *instrumentation)
    1212             : {
    1213             :     int         i;
    1214        1468 :     int         plan_node_id = planstate->plan->plan_node_id;
    1215             :     Instrumentation *instrument;
    1216             : 
    1217        1468 :     InstrEndLoop(planstate->instrument);
    1218             : 
    1219             :     /*
    1220             :      * If we shuffled the plan_node_id values in ps_instrument into sorted
    1221             :      * order, we could use binary search here.  This might matter someday if
    1222             :      * we're pushing down sufficiently large plan trees.  For now, do it the
    1223             :      * slow, dumb way.
    1224             :      */
    1225        4796 :     for (i = 0; i < instrumentation->num_plan_nodes; ++i)
    1226        4796 :         if (instrumentation->plan_node_id[i] == plan_node_id)
    1227        1468 :             break;
    1228        1468 :     if (i >= instrumentation->num_plan_nodes)
    1229           0 :         elog(ERROR, "plan node %d not found", plan_node_id);
    1230             : 
    1231             :     /*
    1232             :      * Add our statistics to the per-node, per-worker totals.  It's possible
    1233             :      * that this could happen more than once if we relaunched workers.
    1234             :      */
    1235        1468 :     instrument = GetInstrumentationArray(instrumentation);
    1236        1468 :     instrument += i * instrumentation->num_workers;
    1237             :     Assert(IsParallelWorker());
    1238             :     Assert(ParallelWorkerNumber < instrumentation->num_workers);
    1239        1468 :     InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
    1240             : 
    1241        1468 :     return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
    1242             :                                  instrumentation);
    1243             : }
    1244             : 
    1245             : /*
    1246             :  * Initialize the PlanState and its descendants with the information
    1247             :  * retrieved from shared memory.  This has to be done once the PlanState
    1248             :  * is allocated and initialized by executor; that is, after ExecutorStart().
    1249             :  */
    1250             : static bool
    1251        5626 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
    1252             : {
    1253        5626 :     if (planstate == NULL)
    1254           0 :         return false;
    1255             : 
    1256        5626 :     switch (nodeTag(planstate))
    1257             :     {
    1258             :         case T_SeqScanState:
    1259        2640 :             if (planstate->plan->parallel_aware)
    1260        2248 :                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
    1261        2640 :             break;
    1262             :         case T_IndexScanState:
    1263         248 :             if (planstate->plan->parallel_aware)
    1264          64 :                 ExecIndexScanInitializeWorker((IndexScanState *) planstate,
    1265             :                                               pwcxt);
    1266         248 :             break;
    1267             :         case T_IndexOnlyScanState:
    1268         144 :             if (planstate->plan->parallel_aware)
    1269         128 :                 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
    1270             :                                                   pwcxt);
    1271         144 :             break;
    1272             :         case T_ForeignScanState:
    1273           0 :             if (planstate->plan->parallel_aware)
    1274           0 :                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
    1275             :                                                 pwcxt);
    1276           0 :             break;
    1277             :         case T_AppendState:
    1278         268 :             if (planstate->plan->parallel_aware)
    1279         228 :                 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
    1280         268 :             break;
    1281             :         case T_CustomScanState:
    1282           0 :             if (planstate->plan->parallel_aware)
    1283           0 :                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
    1284             :                                                pwcxt);
    1285           0 :             break;
    1286             :         case T_BitmapHeapScanState:
    1287         178 :             if (planstate->plan->parallel_aware)
    1288         176 :                 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
    1289             :                                                pwcxt);
    1290         178 :             break;
    1291             :         case T_HashJoinState:
    1292         340 :             if (planstate->plan->parallel_aware)
    1293         180 :                 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
    1294             :                                              pwcxt);
    1295         340 :             break;
    1296             :         case T_HashState:
    1297             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1298         340 :             ExecHashInitializeWorker((HashState *) planstate, pwcxt);
    1299         340 :             break;
    1300             :         case T_SortState:
    1301             :             /* even when not parallel-aware, for EXPLAIN ANALYZE */
    1302         262 :             ExecSortInitializeWorker((SortState *) planstate, pwcxt);
    1303         262 :             break;
    1304             : 
    1305             :         default:
    1306        1206 :             break;
    1307             :     }
    1308             : 
    1309        5626 :     return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
    1310             :                                  pwcxt);
    1311             : }
    1312             : 
    1313             : /*
    1314             :  * Main entrypoint for parallel query worker processes.
    1315             :  *
    1316             :  * We reach this function from ParallelWorkerMain, so the setup necessary to
    1317             :  * create a sensible parallel environment has already been done;
    1318             :  * ParallelWorkerMain worries about stuff like the transaction state, combo
    1319             :  * CID mappings, and GUC values, so we don't need to deal with any of that
    1320             :  * here.
    1321             :  *
    1322             :  * Our job is to deal with concerns specific to the executor.  The parallel
    1323             :  * group leader will have stored a serialized PlannedStmt, and it's our job
    1324             :  * to execute that plan and write the resulting tuples to the appropriate
    1325             :  * tuple queue.  Various bits of supporting information that we need in order
    1326             :  * to do this are also stored in the dsm_segment and can be accessed through
    1327             :  * the shm_toc.
    1328             :  */
    1329             : void
    1330        1558 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    1331             : {
    1332             :     FixedParallelExecutorState *fpes;
    1333             :     BufferUsage *buffer_usage;
    1334             :     DestReceiver *receiver;
    1335             :     QueryDesc  *queryDesc;
    1336             :     SharedExecutorInstrumentation *instrumentation;
    1337             :     SharedJitInstrumentation *jit_instrumentation;
    1338        1558 :     int         instrument_options = 0;
    1339             :     void       *area_space;
    1340             :     dsa_area   *area;
    1341             :     ParallelWorkerContext pwcxt;
    1342             : 
    1343             :     /* Get fixed-size state. */
    1344        1558 :     fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
    1345             : 
    1346             :     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    1347        1558 :     receiver = ExecParallelGetReceiver(seg, toc);
    1348        1558 :     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
    1349        1558 :     if (instrumentation != NULL)
    1350         452 :         instrument_options = instrumentation->instrument_options;
    1351        1558 :     jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
    1352             :                                          true);
    1353        1558 :     queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
    1354             : 
    1355             :     /* Setting debug_query_string for individual workers */
    1356        1558 :     debug_query_string = queryDesc->sourceText;
    1357             : 
    1358             :     /* Report workers' query for monitoring purposes */
    1359        1558 :     pgstat_report_activity(STATE_RUNNING, debug_query_string);
    1360             : 
    1361             :     /* Attach to the dynamic shared memory area. */
    1362        1558 :     area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
    1363        1558 :     area = dsa_attach_in_place(area_space, seg);
    1364             : 
    1365             :     /* Start up the executor */
    1366        1558 :     queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
    1367        1558 :     ExecutorStart(queryDesc, fpes->eflags);
    1368             : 
    1369             :     /* Special executor initialization steps for parallel workers */
    1370        1558 :     queryDesc->planstate->state->es_query_dsa = area;
    1371        1558 :     if (DsaPointerIsValid(fpes->param_exec))
    1372             :     {
    1373             :         char       *paramexec_space;
    1374             : 
    1375          24 :         paramexec_space = dsa_get_address(area, fpes->param_exec);
    1376          24 :         RestoreParamExecParams(paramexec_space, queryDesc->estate);
    1377             : 
    1378             :     }
    1379        1558 :     pwcxt.toc = toc;
    1380        1558 :     pwcxt.seg = seg;
    1381        1558 :     ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
    1382             : 
    1383             :     /* Pass down any tuple bound */
    1384        1558 :     ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
    1385             : 
    1386             :     /*
    1387             :      * Prepare to track buffer usage during query execution.
    1388             :      *
    1389             :      * We do this after starting up the executor to match what happens in the
    1390             :      * leader, which also doesn't count buffer accesses that occur during
    1391             :      * executor startup.
    1392             :      */
    1393        1558 :     InstrStartParallelQuery();
    1394             : 
    1395             :     /*
    1396             :      * Run the plan.  If we specified a tuple bound, be careful not to demand
    1397             :      * more tuples than that.
    1398             :      */
    1399        1558 :     ExecutorRun(queryDesc,
    1400             :                 ForwardScanDirection,
    1401        1558 :                 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
    1402             :                 true);
    1403             : 
    1404             :     /* Shut down the executor */
    1405        1554 :     ExecutorFinish(queryDesc);
    1406             : 
    1407             :     /* Report buffer usage during parallel execution. */
    1408        1554 :     buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
    1409        1554 :     InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
    1410             : 
    1411             :     /* Report instrumentation data if any instrumentation options are set. */
    1412        1554 :     if (instrumentation != NULL)
    1413         452 :         ExecParallelReportInstrumentation(queryDesc->planstate,
    1414             :                                           instrumentation);
    1415             : 
    1416             :     /* Report JIT instrumentation data if any */
    1417        1554 :     if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
    1418             :     {
    1419             :         Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
    1420         192 :         jit_instrumentation->jit_instr[ParallelWorkerNumber] =
    1421          96 :             queryDesc->estate->es_jit->instr;
    1422             :     }
    1423             : 
    1424             :     /* Must do this after capturing instrumentation. */
    1425        1554 :     ExecutorEnd(queryDesc);
    1426             : 
    1427             :     /* Cleanup. */
    1428        1554 :     dsa_detach(area);
    1429        1554 :     FreeQueryDesc(queryDesc);
    1430        1554 :     receiver->rDestroy(receiver);
    1431        1554 : }

Generated by: LCOV version 1.13