LCOV - code coverage report
Current view: top level - src/backend/executor - nodeGather.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 99.3 % 136 135
Test Date: 2026-02-28 23:15:01 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * nodeGather.c
       4              :  *    Support routines for scanning a plan via multiple workers.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * A Gather executor launches parallel workers to run multiple copies of a
      10              :  * plan.  It can also run the plan itself, if the workers are not available
      11              :  * or have not started up yet.  It then merges all of the results it produces
      12              :  * and the results from the workers into a single output stream.  Therefore,
      13              :  * it will normally be used with a plan where running multiple copies of the
      14              :  * same plan does not produce duplicate output, such as parallel-aware
      15              :  * SeqScan.
      16              :  *
      17              :  * Alternatively, a Gather node can be configured to use just one worker
      18              :  * and the single-copy flag can be set.  In this case, the Gather node will
      19              :  * run the plan in one worker and will not execute the plan itself.  In
      20              :  * this case, it simply returns whatever tuples were returned by the worker.
      21              :  * If a worker cannot be obtained, then it will run the plan itself and
      22              :  * return the results.  Therefore, a plan used with a single-copy Gather
      23              :  * node need not be parallel-aware.
      24              :  *
      25              :  * IDENTIFICATION
      26              :  *    src/backend/executor/nodeGather.c
      27              :  *
      28              :  *-------------------------------------------------------------------------
      29              :  */
      30              : 
      31              : #include "postgres.h"
      32              : 
      33              : #include "executor/execParallel.h"
      34              : #include "executor/executor.h"
      35              : #include "executor/nodeGather.h"
      36              : #include "executor/tqueue.h"
      37              : #include "miscadmin.h"
      38              : #include "optimizer/optimizer.h"
      39              : #include "storage/latch.h"
      40              : #include "utils/wait_event.h"
      41              : 
      42              : 
      43              : static TupleTableSlot *ExecGather(PlanState *pstate);
      44              : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
      45              : static MinimalTuple gather_readnext(GatherState *gatherstate);
      46              : static void ExecShutdownGatherWorkers(GatherState *node);
      47              : 
      48              : 
      49              : /* ----------------------------------------------------------------
      50              :  *      ExecInitGather
      51              :  * ----------------------------------------------------------------
      52              :  */
      53              : GatherState *
      54          576 : ExecInitGather(Gather *node, EState *estate, int eflags)
      55              : {
      56              :     GatherState *gatherstate;
      57              :     Plan       *outerNode;
      58              :     TupleDesc   tupDesc;
      59              : 
      60              :     /* Gather node doesn't have innerPlan node. */
      61              :     Assert(innerPlan(node) == NULL);
      62              : 
      63              :     /*
      64              :      * create state structure
      65              :      */
      66          576 :     gatherstate = makeNode(GatherState);
      67          576 :     gatherstate->ps.plan = (Plan *) node;
      68          576 :     gatherstate->ps.state = estate;
      69          576 :     gatherstate->ps.ExecProcNode = ExecGather;
      70              : 
      71          576 :     gatherstate->initialized = false;
      72          576 :     gatherstate->need_to_scan_locally =
      73          576 :         !node->single_copy && parallel_leader_participation;
      74          576 :     gatherstate->tuples_needed = -1;
      75              : 
      76              :     /*
      77              :      * Miscellaneous initialization
      78              :      *
      79              :      * create expression context for node
      80              :      */
      81          576 :     ExecAssignExprContext(estate, &gatherstate->ps);
      82              : 
      83              :     /*
      84              :      * now initialize outer plan
      85              :      */
      86          576 :     outerNode = outerPlan(node);
      87          576 :     outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
      88          576 :     tupDesc = ExecGetResultType(outerPlanState(gatherstate));
      89              : 
      90              :     /*
      91              :      * Leader may access ExecProcNode result directly (if
      92              :      * need_to_scan_locally), or from workers via tuple queue.  So we can't
      93              :      * trivially rely on the slot type being fixed for expressions evaluated
      94              :      * within this node.
      95              :      */
      96          576 :     gatherstate->ps.outeropsset = true;
      97          576 :     gatherstate->ps.outeropsfixed = false;
      98              : 
      99              :     /*
     100              :      * Initialize result type and projection.
     101              :      */
     102          576 :     ExecInitResultTypeTL(&gatherstate->ps);
     103          576 :     ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
     104              : 
     105              :     /*
     106              :      * Without projections result slot type is not trivially known, see
     107              :      * comment above.
     108              :      */
     109          576 :     if (gatherstate->ps.ps_ProjInfo == NULL)
     110              :     {
     111          537 :         gatherstate->ps.resultopsset = true;
     112          537 :         gatherstate->ps.resultopsfixed = false;
     113              :     }
     114              : 
     115              :     /*
     116              :      * Initialize funnel slot to same tuple descriptor as outer plan.
     117              :      */
     118          576 :     gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
     119              :                                                       &TTSOpsMinimalTuple);
     120              : 
     121              :     /*
     122              :      * Gather doesn't support checking a qual (it's always more efficient to
     123              :      * do it in the child node).
     124              :      */
     125              :     Assert(!node->plan.qual);
     126              : 
     127          576 :     return gatherstate;
     128              : }
     129              : 
     130              : /* ----------------------------------------------------------------
     131              :  *      ExecGather(node)
     132              :  *
     133              :  *      Scans the relation via multiple workers and returns
     134              :  *      the next qualifying tuple.
     135              :  * ----------------------------------------------------------------
     136              :  */
     137              : static TupleTableSlot *
     138      1608785 : ExecGather(PlanState *pstate)
     139              : {
     140      1608785 :     GatherState *node = castNode(GatherState, pstate);
     141              :     TupleTableSlot *slot;
     142              :     ExprContext *econtext;
     143              : 
     144      1608785 :     CHECK_FOR_INTERRUPTS();
     145              : 
     146              :     /*
     147              :      * Initialize the parallel context and workers on first execution. We do
     148              :      * this on first execution rather than during node initialization, as it
     149              :      * needs to allocate a large dynamic segment, so it is better to do it
     150              :      * only if it is really needed.
     151              :      */
     152      1608785 :     if (!node->initialized)
     153              :     {
     154          446 :         EState     *estate = node->ps.state;
     155          446 :         Gather     *gather = (Gather *) node->ps.plan;
     156              : 
     157              :         /*
     158              :          * Sometimes we might have to run without parallelism; but if parallel
     159              :          * mode is active then we can try to fire up some workers.
     160              :          */
     161          446 :         if (gather->num_workers > 0 && estate->es_use_parallel_mode)
     162              :         {
     163              :             ParallelContext *pcxt;
     164              : 
     165              :             /* Initialize, or re-initialize, shared state needed by workers. */
     166          425 :             if (!node->pei)
     167          311 :                 node->pei = ExecInitParallelPlan(outerPlanState(node),
     168              :                                                  estate,
     169              :                                                  gather->initParam,
     170              :                                                  gather->num_workers,
     171              :                                                  node->tuples_needed);
     172              :             else
     173          114 :                 ExecParallelReinitialize(outerPlanState(node),
     174          114 :                                          node->pei,
     175              :                                          gather->initParam);
     176              : 
     177              :             /*
     178              :              * Register backend workers. We might not get as many as we
     179              :              * requested, or indeed any at all.
     180              :              */
     181          425 :             pcxt = node->pei->pcxt;
     182          425 :             LaunchParallelWorkers(pcxt);
     183              :             /* We save # workers launched for the benefit of EXPLAIN */
     184          425 :             node->nworkers_launched = pcxt->nworkers_launched;
     185              : 
     186              :             /*
     187              :              * Count number of workers originally wanted and actually
     188              :              * launched.
     189              :              */
     190          425 :             estate->es_parallel_workers_to_launch += pcxt->nworkers_to_launch;
     191          425 :             estate->es_parallel_workers_launched += pcxt->nworkers_launched;
     192              : 
     193              :             /* Set up tuple queue readers to read the results. */
     194          425 :             if (pcxt->nworkers_launched > 0)
     195              :             {
     196          422 :                 ExecParallelCreateReaders(node->pei);
     197              :                 /* Make a working array showing the active readers */
     198          422 :                 node->nreaders = pcxt->nworkers_launched;
     199          422 :                 node->reader = (TupleQueueReader **)
     200          422 :                     palloc(node->nreaders * sizeof(TupleQueueReader *));
     201          422 :                 memcpy(node->reader, node->pei->reader,
     202          422 :                        node->nreaders * sizeof(TupleQueueReader *));
     203              :             }
     204              :             else
     205              :             {
     206              :                 /* No workers?  Then never mind. */
     207            3 :                 node->nreaders = 0;
     208            3 :                 node->reader = NULL;
     209              :             }
     210          425 :             node->nextreader = 0;
     211              :         }
     212              : 
     213              :         /* Run plan locally if no workers or enabled and not single-copy. */
     214          892 :         node->need_to_scan_locally = (node->nreaders == 0)
     215          446 :             || (!gather->single_copy && parallel_leader_participation);
     216          446 :         node->initialized = true;
     217              :     }
     218              : 
     219              :     /*
     220              :      * Reset per-tuple memory context to free any expression evaluation
     221              :      * storage allocated in the previous tuple cycle.
     222              :      */
     223      1608785 :     econtext = node->ps.ps_ExprContext;
     224      1608785 :     ResetExprContext(econtext);
     225              : 
     226              :     /*
     227              :      * Get next tuple, either from one of our workers, or by running the plan
     228              :      * ourselves.
     229              :      */
     230      1608785 :     slot = gather_getnext(node);
     231      1608779 :     if (TupIsNull(slot))
     232          422 :         return NULL;
     233              : 
     234              :     /* If no projection is required, we're done. */
     235      1608357 :     if (node->ps.ps_ProjInfo == NULL)
     236      1608339 :         return slot;
     237              : 
     238              :     /*
     239              :      * Form the result tuple using ExecProject(), and return it.
     240              :      */
     241           18 :     econtext->ecxt_outertuple = slot;
     242           18 :     return ExecProject(node->ps.ps_ProjInfo);
     243              : }
     244              : 
     245              : /* ----------------------------------------------------------------
     246              :  *      ExecEndGather
     247              :  *
     248              :  *      frees any storage allocated through C routines.
     249              :  * ----------------------------------------------------------------
     250              :  */
     251              : void
     252          570 : ExecEndGather(GatherState *node)
     253              : {
     254          570 :     ExecEndNode(outerPlanState(node));  /* let children clean up first */
     255          570 :     ExecShutdownGather(node);
     256          570 : }
     257              : 
     258              : /*
     259              :  * Read the next tuple.  We might fetch a tuple from one of the tuple queues
     260              :  * using gather_readnext, or if no tuple queue contains a tuple and the
     261              :  * single_copy flag is not set, we might generate one locally instead.
     262              :  */
     263              : static TupleTableSlot *
     264      1608785 : gather_getnext(GatherState *gatherstate)
     265              : {
     266      1608785 :     PlanState  *outerPlan = outerPlanState(gatherstate);
     267              :     TupleTableSlot *outerTupleSlot;
     268      1608785 :     TupleTableSlot *fslot = gatherstate->funnel_slot;
     269              :     MinimalTuple tup;
     270              : 
     271      3218242 :     while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
     272              :     {
     273      1609035 :         CHECK_FOR_INTERRUPTS();
     274              : 
     275      1609035 :         if (gatherstate->nreaders > 0)
     276              :         {
     277      1455767 :             tup = gather_readnext(gatherstate);
     278              : 
     279      1455761 :             if (HeapTupleIsValid(tup))
     280              :             {
     281       755774 :                 ExecStoreMinimalTuple(tup,  /* tuple to store */
     282              :                                       fslot,    /* slot to store the tuple */
     283              :                                       false);   /* don't pfree tuple  */
     284       755774 :                 return fslot;
     285              :             }
     286              :         }
     287              : 
     288       853255 :         if (gatherstate->need_to_scan_locally)
     289              :         {
     290       852884 :             EState     *estate = gatherstate->ps.state;
     291              : 
     292              :             /* Install our DSA area while executing the plan. */
     293       852884 :             estate->es_query_dsa =
     294       852884 :                 gatherstate->pei ? gatherstate->pei->area : NULL;
     295       852884 :             outerTupleSlot = ExecProcNode(outerPlan);
     296       852884 :             estate->es_query_dsa = NULL;
     297              : 
     298       852884 :             if (!TupIsNull(outerTupleSlot))
     299       852583 :                 return outerTupleSlot;
     300              : 
     301          301 :             gatherstate->need_to_scan_locally = false;
     302              :         }
     303              :     }
     304              : 
     305          422 :     return ExecClearTuple(fslot);
     306              : }
     307              : 
     308              : /*
     309              :  * Attempt to read a tuple from one of our parallel workers.
     310              :  */
     311              : static MinimalTuple
     312      1455767 : gather_readnext(GatherState *gatherstate)
     313              : {
     314      1455767 :     int         nvisited = 0;
     315              : 
     316              :     for (;;)
     317      1833582 :     {
     318              :         TupleQueueReader *reader;
     319              :         MinimalTuple tup;
     320              :         bool        readerdone;
     321              : 
     322              :         /* Check for async events, particularly messages from workers. */
     323      3289349 :         CHECK_FOR_INTERRUPTS();
     324              : 
     325              :         /*
     326              :          * Attempt to read a tuple, but don't block if none is available.
     327              :          *
     328              :          * Note that TupleQueueReaderNext will just return NULL for a worker
     329              :          * which fails to initialize.  We'll treat that worker as having
     330              :          * produced no tuples; WaitForParallelWorkersToFinish will error out
     331              :          * when we get there.
     332              :          */
     333              :         Assert(gatherstate->nextreader < gatherstate->nreaders);
     334      3289343 :         reader = gatherstate->reader[gatherstate->nextreader];
     335      3289343 :         tup = TupleQueueReaderNext(reader, true, &readerdone);
     336              : 
     337              :         /*
     338              :          * If this reader is done, remove it from our working array of active
     339              :          * readers.  If all readers are done, we're outta here.
     340              :          */
     341      3289343 :         if (readerdone)
     342              :         {
     343              :             Assert(!tup);
     344         1118 :             --gatherstate->nreaders;
     345         1118 :             if (gatherstate->nreaders == 0)
     346              :             {
     347          416 :                 ExecShutdownGatherWorkers(gatherstate);
     348      1455761 :                 return NULL;
     349              :             }
     350          702 :             memmove(&gatherstate->reader[gatherstate->nextreader],
     351          702 :                     &gatherstate->reader[gatherstate->nextreader + 1],
     352              :                     sizeof(TupleQueueReader *)
     353          702 :                     * (gatherstate->nreaders - gatherstate->nextreader));
     354          702 :             if (gatherstate->nextreader >= gatherstate->nreaders)
     355          267 :                 gatherstate->nextreader = 0;
     356          702 :             continue;
     357              :         }
     358              : 
     359              :         /* If we got a tuple, return it. */
     360      3288225 :         if (tup)
     361       755774 :             return tup;
     362              : 
     363              :         /*
     364              :          * Advance nextreader pointer in round-robin fashion.  Note that we
     365              :          * only reach this code if we weren't able to get a tuple from the
     366              :          * current worker.  We used to advance the nextreader pointer after
     367              :          * every tuple, but it turns out to be much more efficient to keep
     368              :          * reading from the same queue until that would require blocking.
     369              :          */
     370      2532451 :         gatherstate->nextreader++;
     371      2532451 :         if (gatherstate->nextreader >= gatherstate->nreaders)
     372       703511 :             gatherstate->nextreader = 0;
     373              : 
     374              :         /* Have we visited every (surviving) TupleQueueReader? */
     375      2532451 :         nvisited++;
     376      2532451 :         if (nvisited >= gatherstate->nreaders)
     377              :         {
     378              :             /*
     379              :              * If (still) running plan locally, return NULL so caller can
     380              :              * generate another tuple from the local copy of the plan.
     381              :              */
     382       703340 :             if (gatherstate->need_to_scan_locally)
     383       699571 :                 return NULL;
     384              : 
     385              :             /* Nothing to do except wait for developments. */
     386         3769 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     387              :                              WAIT_EVENT_EXECUTE_GATHER);
     388         3769 :             ResetLatch(MyLatch);
     389         3769 :             nvisited = 0;
     390              :         }
     391              :     }
     392              : }
     393              : 
     394              : /* ----------------------------------------------------------------
     395              :  *      ExecShutdownGatherWorkers
     396              :  *
     397              :  *      Stop all the parallel workers.
     398              :  * ----------------------------------------------------------------
     399              :  */
     400              : static void
     401         1468 : ExecShutdownGatherWorkers(GatherState *node)
     402              : {
     403         1468 :     if (node->pei != NULL)
     404          835 :         ExecParallelFinish(node->pei);
     405              : 
     406              :     /* Flush local copy of reader array */
     407         1468 :     if (node->reader)
     408          416 :         pfree(node->reader);
     409         1468 :     node->reader = NULL;
     410         1468 : }
     411              : 
     412              : /* ----------------------------------------------------------------
     413              :  *      ExecShutdownGather
     414              :  *
     415              :  *      Destroy the setup for parallel workers including parallel context.
     416              :  * ----------------------------------------------------------------
     417              :  */
     418              : void
     419          902 : ExecShutdownGather(GatherState *node)
     420              : {
     421          902 :     ExecShutdownGatherWorkers(node);
     422              : 
     423              :     /* Now destroy the parallel context. */
     424          902 :     if (node->pei != NULL)
     425              :     {
     426          305 :         ExecParallelCleanup(node->pei);
     427          305 :         node->pei = NULL;
     428              :     }
     429          902 : }
     430              : 
     431              : /* ----------------------------------------------------------------
     432              :  *                      Join Support
     433              :  * ----------------------------------------------------------------
     434              :  */
     435              : 
     436              : /* ----------------------------------------------------------------
     437              :  *      ExecReScanGather
     438              :  *
     439              :  *      Prepare to re-scan the result of a Gather.
     440              :  * ----------------------------------------------------------------
     441              :  */
     442              : void
     443          150 : ExecReScanGather(GatherState *node)
     444              : {
     445          150 :     Gather     *gather = (Gather *) node->ps.plan;
     446          150 :     PlanState  *outerPlan = outerPlanState(node);
     447              : 
     448              :     /* Make sure any existing workers are gracefully shut down */
     449          150 :     ExecShutdownGatherWorkers(node);
     450              : 
     451              :     /* Mark node so that shared state will be rebuilt at next call */
     452          150 :     node->initialized = false;
     453              : 
     454              :     /*
     455              :      * Set child node's chgParam to tell it that the next scan might deliver a
     456              :      * different set of rows within the leader process.  (The overall rowset
     457              :      * shouldn't change, but the leader process's subset might; hence nodes
     458              :      * between here and the parallel table scan node mustn't optimize on the
     459              :      * assumption of an unchanging rowset.)
     460              :      */
     461          150 :     if (gather->rescan_param >= 0)
     462          150 :         outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
     463              :                                              gather->rescan_param);
     464              : 
     465              :     /*
     466              :      * If chgParam of subnode is not null then plan will be re-scanned by
     467              :      * first ExecProcNode.  Note: because this does nothing if we have a
     468              :      * rescan_param, it's currently guaranteed that parallel-aware child nodes
     469              :      * will not see a ReScan call until after they get a ReInitializeDSM call.
     470              :      * That ordering might not be something to rely on, though.  A good rule
     471              :      * of thumb is that ReInitializeDSM should reset only shared state, ReScan
     472              :      * should reset only local state, and anything that depends on both of
     473              :      * those steps being finished must wait until the first ExecProcNode call.
     474              :      */
     475          150 :     if (outerPlan->chgParam == NULL)
     476            0 :         ExecReScan(outerPlan);
     477          150 : }
        

Generated by: LCOV version 2.0-1