LCOV - code coverage report
Current view: top level - src/backend/executor - nodeGather.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 134 137 97.8 %
Date: 2023-06-06 10:12:12 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * nodeGather.c
       4             :  *    Support routines for scanning a plan via multiple workers.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * A Gather executor launches parallel workers to run multiple copies of a
      10             :  * plan.  It can also run the plan itself, if the workers are not available
      11             :  * or have not started up yet.  It then merges all of the results it produces
      12             :  * and the results from the workers into a single output stream.  Therefore,
      13             :  * it will normally be used with a plan where running multiple copies of the
      14             :  * same plan does not produce duplicate output, such as parallel-aware
      15             :  * SeqScan.
      16             :  *
      17             :  * Alternatively, a Gather node can be configured to use just one worker
      18             :  * and the single-copy flag can be set.  In this case, the Gather node will
      19             :  * run the plan in one worker and will not execute the plan itself.  In
      20             :  * this case, it simply returns whatever tuples were returned by the worker.
      21             :  * If a worker cannot be obtained, then it will run the plan itself and
      22             :  * return the results.  Therefore, a plan used with a single-copy Gather
      23             :  * node need not be parallel-aware.
      24             :  *
      25             :  * IDENTIFICATION
      26             :  *    src/backend/executor/nodeGather.c
      27             :  *
      28             :  *-------------------------------------------------------------------------
      29             :  */
      30             : 
      31             : #include "postgres.h"
      32             : 
      33             : #include "access/relscan.h"
      34             : #include "access/xact.h"
      35             : #include "executor/execdebug.h"
      36             : #include "executor/execParallel.h"
      37             : #include "executor/nodeGather.h"
      38             : #include "executor/nodeSubplan.h"
      39             : #include "executor/tqueue.h"
      40             : #include "miscadmin.h"
      41             : #include "optimizer/optimizer.h"
      42             : #include "pgstat.h"
      43             : #include "utils/memutils.h"
      44             : #include "utils/rel.h"
      45             : 
      46             : 
      47             : static TupleTableSlot *ExecGather(PlanState *pstate);
      48             : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
      49             : static MinimalTuple gather_readnext(GatherState *gatherstate);
      50             : static void ExecShutdownGatherWorkers(GatherState *node);
      51             : 
      52             : 
      53             : /* ----------------------------------------------------------------
      54             :  *      ExecInitGather
      55             :  * ----------------------------------------------------------------
      56             :  */
      57             : GatherState *
      58         962 : ExecInitGather(Gather *node, EState *estate, int eflags)
      59             : {
      60             :     GatherState *gatherstate;
      61             :     Plan       *outerNode;
      62             :     TupleDesc   tupDesc;
      63             : 
      64             :     /* Gather node doesn't have innerPlan node. */
      65             :     Assert(innerPlan(node) == NULL);
      66             : 
      67             :     /*
      68             :      * create state structure
      69             :      */
      70         962 :     gatherstate = makeNode(GatherState);
      71         962 :     gatherstate->ps.plan = (Plan *) node;
      72         962 :     gatherstate->ps.state = estate;
      73         962 :     gatherstate->ps.ExecProcNode = ExecGather;
      74             : 
      75         962 :     gatherstate->initialized = false;
      76         962 :     gatherstate->need_to_scan_locally =
      77         962 :         !node->single_copy && parallel_leader_participation;
      78         962 :     gatherstate->tuples_needed = -1;
      79             : 
      80             :     /*
      81             :      * Miscellaneous initialization
      82             :      *
      83             :      * create expression context for node
      84             :      */
      85         962 :     ExecAssignExprContext(estate, &gatherstate->ps);
      86             : 
      87             :     /*
      88             :      * now initialize outer plan
      89             :      */
      90         962 :     outerNode = outerPlan(node);
      91         962 :     outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
      92         962 :     tupDesc = ExecGetResultType(outerPlanState(gatherstate));
      93             : 
      94             :     /*
      95             :      * Leader may access ExecProcNode result directly (if
      96             :      * need_to_scan_locally), or from workers via tuple queue.  So we can't
      97             :      * trivially rely on the slot type being fixed for expressions evaluated
      98             :      * within this node.
      99             :      */
     100         962 :     gatherstate->ps.outeropsset = true;
     101         962 :     gatherstate->ps.outeropsfixed = false;
     102             : 
     103             :     /*
     104             :      * Initialize result type and projection.
     105             :      */
     106         962 :     ExecInitResultTypeTL(&gatherstate->ps);
     107         962 :     ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
     108             : 
     109             :     /*
     110             :      * Without projections result slot type is not trivially known, see
     111             :      * comment above.
     112             :      */
     113         962 :     if (gatherstate->ps.ps_ProjInfo == NULL)
     114             :     {
     115         920 :         gatherstate->ps.resultopsset = true;
     116         920 :         gatherstate->ps.resultopsfixed = false;
     117             :     }
     118             : 
     119             :     /*
     120             :      * Initialize funnel slot to same tuple descriptor as outer plan.
     121             :      */
     122         962 :     gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
     123             :                                                       &TTSOpsMinimalTuple);
     124             : 
     125             :     /*
     126             :      * Gather doesn't support checking a qual (it's always more efficient to
     127             :      * do it in the child node).
     128             :      */
     129             :     Assert(!node->plan.qual);
     130             : 
     131         962 :     return gatherstate;
     132             : }
     133             : 
     134             : /* ----------------------------------------------------------------
     135             :  *      ExecGather(node)
     136             :  *
     137             :  *      Scans the relation via multiple workers and returns
     138             :  *      the next qualifying tuple.
     139             :  * ----------------------------------------------------------------
     140             :  */
     141             : static TupleTableSlot *
     142     2918304 : ExecGather(PlanState *pstate)
     143             : {
     144     2918304 :     GatherState *node = castNode(GatherState, pstate);
     145             :     TupleTableSlot *slot;
     146             :     ExprContext *econtext;
     147             : 
     148     2918304 :     CHECK_FOR_INTERRUPTS();
     149             : 
     150             :     /*
     151             :      * Initialize the parallel context and workers on first execution. We do
     152             :      * this on first execution rather than during node initialization, as it
     153             :      * needs to allocate a large dynamic segment, so it is better to do it
     154             :      * only if it is really needed.
     155             :      */
     156     2918304 :     if (!node->initialized)
     157             :     {
     158         770 :         EState     *estate = node->ps.state;
     159         770 :         Gather     *gather = (Gather *) node->ps.plan;
     160             : 
     161             :         /*
     162             :          * Sometimes we might have to run without parallelism; but if parallel
     163             :          * mode is active then we can try to fire up some workers.
     164             :          */
     165         770 :         if (gather->num_workers > 0 && estate->es_use_parallel_mode)
     166             :         {
     167             :             ParallelContext *pcxt;
     168             : 
     169             :             /* Initialize, or re-initialize, shared state needed by workers. */
     170         764 :             if (!node->pei)
     171         536 :                 node->pei = ExecInitParallelPlan(outerPlanState(node),
     172             :                                                  estate,
     173             :                                                  gather->initParam,
     174             :                                                  gather->num_workers,
     175             :                                                  node->tuples_needed);
     176             :             else
     177         228 :                 ExecParallelReinitialize(outerPlanState(node),
     178         228 :                                          node->pei,
     179             :                                          gather->initParam);
     180             : 
     181             :             /*
     182             :              * Register backend workers. We might not get as many as we
     183             :              * requested, or indeed any at all.
     184             :              */
     185         764 :             pcxt = node->pei->pcxt;
     186         764 :             LaunchParallelWorkers(pcxt);
     187             :             /* We save # workers launched for the benefit of EXPLAIN */
     188         764 :             node->nworkers_launched = pcxt->nworkers_launched;
     189             : 
     190             :             /* Set up tuple queue readers to read the results. */
     191         764 :             if (pcxt->nworkers_launched > 0)
     192             :             {
     193         758 :                 ExecParallelCreateReaders(node->pei);
     194             :                 /* Make a working array showing the active readers */
     195         758 :                 node->nreaders = pcxt->nworkers_launched;
     196         758 :                 node->reader = (TupleQueueReader **)
     197         758 :                     palloc(node->nreaders * sizeof(TupleQueueReader *));
     198         758 :                 memcpy(node->reader, node->pei->reader,
     199         758 :                        node->nreaders * sizeof(TupleQueueReader *));
     200             :             }
     201             :             else
     202             :             {
     203             :                 /* No workers?  Then never mind. */
     204           6 :                 node->nreaders = 0;
     205           6 :                 node->reader = NULL;
     206             :             }
     207         764 :             node->nextreader = 0;
     208             :         }
     209             : 
     210             :         /* Run plan locally if no workers or enabled and not single-copy. */
     211        1540 :         node->need_to_scan_locally = (node->nreaders == 0)
     212         770 :             || (!gather->single_copy && parallel_leader_participation);
     213         770 :         node->initialized = true;
     214             :     }
     215             : 
     216             :     /*
     217             :      * Reset per-tuple memory context to free any expression evaluation
     218             :      * storage allocated in the previous tuple cycle.
     219             :      */
     220     2918304 :     econtext = node->ps.ps_ExprContext;
     221     2918304 :     ResetExprContext(econtext);
     222             : 
     223             :     /*
     224             :      * Get next tuple, either from one of our workers, or by running the plan
     225             :      * ourselves.
     226             :      */
     227     2918304 :     slot = gather_getnext(node);
     228     2918298 :     if (TupIsNull(slot))
     229         764 :         return NULL;
     230             : 
     231             :     /* If no projection is required, we're done. */
     232     2917534 :     if (node->ps.ps_ProjInfo == NULL)
     233     2917534 :         return slot;
     234             : 
     235             :     /*
     236             :      * Form the result tuple using ExecProject(), and return it.
     237             :      */
     238           0 :     econtext->ecxt_outertuple = slot;
     239           0 :     return ExecProject(node->ps.ps_ProjInfo);
     240             : }
     241             : 
     242             : /* ----------------------------------------------------------------
     243             :  *      ExecEndGather
     244             :  *
     245             :  *      frees any storage allocated through C routines.
     246             :  * ----------------------------------------------------------------
     247             :  */
     248             : void
     249         956 : ExecEndGather(GatherState *node)
     250             : {
     251         956 :     ExecEndNode(outerPlanState(node));  /* let children clean up first */
     252         956 :     ExecShutdownGather(node);
     253         956 :     ExecFreeExprContext(&node->ps);
     254         956 :     if (node->ps.ps_ResultTupleSlot)
     255          42 :         ExecClearTuple(node->ps.ps_ResultTupleSlot);
     256         956 : }
     257             : 
     258             : /*
     259             :  * Read the next tuple.  We might fetch a tuple from one of the tuple queues
     260             :  * using gather_readnext, or if no tuple queue contains a tuple and the
     261             :  * single_copy flag is not set, we might generate one locally instead.
     262             :  */
     263             : static TupleTableSlot *
     264     2918304 : gather_getnext(GatherState *gatherstate)
     265             : {
     266     2918304 :     PlanState  *outerPlan = outerPlanState(gatherstate);
     267             :     TupleTableSlot *outerTupleSlot;
     268     2918304 :     TupleTableSlot *fslot = gatherstate->funnel_slot;
     269             :     MinimalTuple tup;
     270             : 
     271     2919600 :     while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
     272             :     {
     273     2918836 :         CHECK_FOR_INTERRUPTS();
     274             : 
     275     2918836 :         if (gatherstate->nreaders > 0)
     276             :         {
     277     2918806 :             tup = gather_readnext(gatherstate);
     278             : 
     279     2918800 :             if (HeapTupleIsValid(tup))
     280             :             {
     281     1440982 :                 ExecStoreMinimalTuple(tup,  /* tuple to store */
     282             :                                       fslot,    /* slot to store the tuple */
     283             :                                       false);   /* don't pfree tuple  */
     284     1440982 :                 return fslot;
     285             :             }
     286             :         }
     287             : 
     288     1477848 :         if (gatherstate->need_to_scan_locally)
     289             :         {
     290     1477122 :             EState     *estate = gatherstate->ps.state;
     291             : 
     292             :             /* Install our DSA area while executing the plan. */
     293     1477122 :             estate->es_query_dsa =
     294     1477122 :                 gatherstate->pei ? gatherstate->pei->area : NULL;
     295     1477122 :             outerTupleSlot = ExecProcNode(outerPlan);
     296     1477122 :             estate->es_query_dsa = NULL;
     297             : 
     298     1477122 :             if (!TupIsNull(outerTupleSlot))
     299     1476552 :                 return outerTupleSlot;
     300             : 
     301         570 :             gatherstate->need_to_scan_locally = false;
     302             :         }
     303             :     }
     304             : 
     305         764 :     return ExecClearTuple(fslot);
     306             : }
     307             : 
     308             : /*
     309             :  * Attempt to read a tuple from one of our parallel workers.
     310             :  */
     311             : static MinimalTuple
     312     2918806 : gather_readnext(GatherState *gatherstate)
     313             : {
     314     2918806 :     int         nvisited = 0;
     315             : 
     316             :     for (;;)
     317     3882608 :     {
     318             :         TupleQueueReader *reader;
     319             :         MinimalTuple tup;
     320             :         bool        readerdone;
     321             : 
     322             :         /* Check for async events, particularly messages from workers. */
     323     6801414 :         CHECK_FOR_INTERRUPTS();
     324             : 
     325             :         /*
     326             :          * Attempt to read a tuple, but don't block if none is available.
     327             :          *
     328             :          * Note that TupleQueueReaderNext will just return NULL for a worker
     329             :          * which fails to initialize.  We'll treat that worker as having
     330             :          * produced no tuples; WaitForParallelWorkersToFinish will error out
     331             :          * when we get there.
     332             :          */
     333             :         Assert(gatherstate->nextreader < gatherstate->nreaders);
     334     6801408 :         reader = gatherstate->reader[gatherstate->nextreader];
     335     6801408 :         tup = TupleQueueReaderNext(reader, true, &readerdone);
     336             : 
     337             :         /*
     338             :          * If this reader is done, remove it from our working array of active
     339             :          * readers.  If all readers are done, we're outta here.
     340             :          */
     341     6801408 :         if (readerdone)
     342             :         {
     343             :             Assert(!tup);
     344        2042 :             --gatherstate->nreaders;
     345        2042 :             if (gatherstate->nreaders == 0)
     346             :             {
     347         752 :                 ExecShutdownGatherWorkers(gatherstate);
     348     2918800 :                 return NULL;
     349             :             }
     350        1290 :             memmove(&gatherstate->reader[gatherstate->nextreader],
     351        1290 :                     &gatherstate->reader[gatherstate->nextreader + 1],
     352             :                     sizeof(TupleQueueReader *)
     353        1290 :                     * (gatherstate->nreaders - gatherstate->nextreader));
     354        1290 :             if (gatherstate->nextreader >= gatherstate->nreaders)
     355        1026 :                 gatherstate->nextreader = 0;
     356        1290 :             continue;
     357             :         }
     358             : 
     359             :         /* If we got a tuple, return it. */
     360     6799366 :         if (tup)
     361     1440982 :             return tup;
     362             : 
     363             :         /*
     364             :          * Advance nextreader pointer in round-robin fashion.  Note that we
     365             :          * only reach this code if we weren't able to get a tuple from the
     366             :          * current worker.  We used to advance the nextreader pointer after
     367             :          * every tuple, but it turns out to be much more efficient to keep
     368             :          * reading from the same queue until that would require blocking.
     369             :          */
     370     5358384 :         gatherstate->nextreader++;
     371     5358384 :         if (gatherstate->nextreader >= gatherstate->nreaders)
     372     1493076 :             gatherstate->nextreader = 0;
     373             : 
     374             :         /* Have we visited every (surviving) TupleQueueReader? */
     375     5358384 :         nvisited++;
     376     5358384 :         if (nvisited >= gatherstate->nreaders)
     377             :         {
     378             :             /*
     379             :              * If (still) running plan locally, return NULL so caller can
     380             :              * generate another tuple from the local copy of the plan.
     381             :              */
     382     1493036 :             if (gatherstate->need_to_scan_locally)
     383     1477066 :                 return NULL;
     384             : 
     385             :             /* Nothing to do except wait for developments. */
     386       15970 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     387             :                              WAIT_EVENT_EXECUTE_GATHER);
     388       15970 :             ResetLatch(MyLatch);
     389       15970 :             nvisited = 0;
     390             :         }
     391             :     }
     392             : }
     393             : 
     394             : /* ----------------------------------------------------------------
     395             :  *      ExecShutdownGatherWorkers
     396             :  *
     397             :  *      Stop all the parallel workers.
     398             :  * ----------------------------------------------------------------
     399             :  */
     400             : static void
     401        2556 : ExecShutdownGatherWorkers(GatherState *node)
     402             : {
     403        2556 :     if (node->pei != NULL)
     404        1510 :         ExecParallelFinish(node->pei);
     405             : 
     406             :     /* Flush local copy of reader array */
     407        2556 :     if (node->reader)
     408         752 :         pfree(node->reader);
     409        2556 :     node->reader = NULL;
     410        2556 : }
     411             : 
     412             : /* ----------------------------------------------------------------
     413             :  *      ExecShutdownGather
     414             :  *
     415             :  *      Destroy the setup for parallel workers including parallel context.
     416             :  * ----------------------------------------------------------------
     417             :  */
     418             : void
     419        1504 : ExecShutdownGather(GatherState *node)
     420             : {
     421        1504 :     ExecShutdownGatherWorkers(node);
     422             : 
     423             :     /* Now destroy the parallel context. */
     424        1504 :     if (node->pei != NULL)
     425             :     {
     426         530 :         ExecParallelCleanup(node->pei);
     427         530 :         node->pei = NULL;
     428             :     }
     429        1504 : }
     430             : 
     431             : /* ----------------------------------------------------------------
     432             :  *                      Join Support
     433             :  * ----------------------------------------------------------------
     434             :  */
     435             : 
     436             : /* ----------------------------------------------------------------
     437             :  *      ExecReScanGather
     438             :  *
     439             :  *      Prepare to re-scan the result of a Gather.
     440             :  * ----------------------------------------------------------------
     441             :  */
     442             : void
     443         300 : ExecReScanGather(GatherState *node)
     444             : {
     445         300 :     Gather     *gather = (Gather *) node->ps.plan;
     446         300 :     PlanState  *outerPlan = outerPlanState(node);
     447             : 
     448             :     /* Make sure any existing workers are gracefully shut down */
     449         300 :     ExecShutdownGatherWorkers(node);
     450             : 
     451             :     /* Mark node so that shared state will be rebuilt at next call */
     452         300 :     node->initialized = false;
     453             : 
     454             :     /*
     455             :      * Set child node's chgParam to tell it that the next scan might deliver a
     456             :      * different set of rows within the leader process.  (The overall rowset
     457             :      * shouldn't change, but the leader process's subset might; hence nodes
     458             :      * between here and the parallel table scan node mustn't optimize on the
     459             :      * assumption of an unchanging rowset.)
     460             :      */
     461         300 :     if (gather->rescan_param >= 0)
     462         300 :         outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
     463             :                                              gather->rescan_param);
     464             : 
     465             :     /*
     466             :      * If chgParam of subnode is not null then plan will be re-scanned by
     467             :      * first ExecProcNode.  Note: because this does nothing if we have a
     468             :      * rescan_param, it's currently guaranteed that parallel-aware child nodes
     469             :      * will not see a ReScan call until after they get a ReInitializeDSM call.
     470             :      * That ordering might not be something to rely on, though.  A good rule
     471             :      * of thumb is that ReInitializeDSM should reset only shared state, ReScan
     472             :      * should reset only local state, and anything that depends on both of
     473             :      * those steps being finished must wait until the first ExecProcNode call.
     474             :      */
     475         300 :     if (outerPlan->chgParam == NULL)
     476           0 :         ExecReScan(outerPlan);
     477         300 : }

Generated by: LCOV version 1.14