LCOV - code coverage report
Current view: top level - src/backend/executor - nodeAppend.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 328 347 94.5 %
Date: 2025-04-01 14:15:22 Functions: 17 18 94.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * nodeAppend.c
       4             :  *    routines to handle append nodes.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/executor/nodeAppend.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : /* INTERFACE ROUTINES
      16             :  *      ExecInitAppend  - initialize the append node
      17             :  *      ExecAppend      - retrieve the next tuple from the node
      18             :  *      ExecEndAppend   - shut down the append node
      19             :  *      ExecReScanAppend - rescan the append node
      20             :  *
      21             :  *   NOTES
      22             :  *      Each append node contains a list of one or more subplans which
      23             :  *      must be iteratively processed (forwards or backwards).
      24             :  *      Tuples are retrieved by executing the 'whichplan'th subplan
      25             :  *      until the subplan stops returning tuples, at which point that
      26             :  *      plan is shut down and the next started up.
      27             :  *
      28             :  *      Append nodes don't make use of their left and right
      29             :  *      subtrees, rather they maintain a list of subplans so
      30             :  *      a typical append node looks like this in the plan tree:
      31             :  *
      32             :  *                 ...
      33             :  *                 /
      34             :  *              Append -------+------+------+--- nil
      35             :  *              /   \         |      |      |
      36             :  *            nil   nil      ...    ...    ...
      37             :  *                               subplans
      38             :  *
      39             :  *      Append nodes are currently used for unions, and to support
      40             :  *      inheritance queries, where several relations need to be scanned.
      41             :  *      For example, in our standard person/student/employee/student-emp
      42             :  *      example, where student and employee inherit from person
      43             :  *      and student-emp inherits from student and employee, the
      44             :  *      query:
      45             :  *
      46             :  *              select name from person
      47             :  *
      48             :  *      generates the plan:
      49             :  *
      50             :  *                |
      51             :  *              Append -------+-------+--------+--------+
      52             :  *              /   \         |       |        |        |
      53             :  *            nil   nil      Scan    Scan     Scan     Scan
      54             :  *                            |       |        |        |
      55             :  *                          person employee student student-emp
      56             :  */
      57             : 
      58             : #include "postgres.h"
      59             : 
      60             : #include "executor/execAsync.h"
      61             : #include "executor/execPartition.h"
      62             : #include "executor/executor.h"
      63             : #include "executor/nodeAppend.h"
      64             : #include "miscadmin.h"
      65             : #include "pgstat.h"
      66             : #include "storage/latch.h"
      67             : 
      68             : /* Shared state for parallel-aware Append. */
      69             : struct ParallelAppendState
      70             : {
      71             :     LWLock      pa_lock;        /* mutual exclusion to choose next subplan */
      72             :     int         pa_next_plan;   /* next plan to choose by any worker */
      73             : 
      74             :     /*
      75             :      * pa_finished[i] should be true if no more workers should select subplan
      76             :      * i.  for a non-partial plan, this should be set to true as soon as a
      77             :      * worker selects the plan; for a partial plan, it remains false until
      78             :      * some worker executes the plan to completion.
      79             :      */
      80             :     bool        pa_finished[FLEXIBLE_ARRAY_MEMBER];
      81             : };
      82             : 
      83             : #define INVALID_SUBPLAN_INDEX       -1
      84             : #define EVENT_BUFFER_SIZE           16
      85             : 
      86             : static TupleTableSlot *ExecAppend(PlanState *pstate);
      87             : static bool choose_next_subplan_locally(AppendState *node);
      88             : static bool choose_next_subplan_for_leader(AppendState *node);
      89             : static bool choose_next_subplan_for_worker(AppendState *node);
      90             : static void mark_invalid_subplans_as_finished(AppendState *node);
      91             : static void ExecAppendAsyncBegin(AppendState *node);
      92             : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
      93             : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
      94             : static void ExecAppendAsyncEventWait(AppendState *node);
      95             : static void classify_matching_subplans(AppendState *node);
      96             : 
      97             : /* ----------------------------------------------------------------
      98             :  *      ExecInitAppend
      99             :  *
     100             :  *      Begin all of the subscans of the append node.
     101             :  *
     102             :  *     (This is potentially wasteful, since the entire result of the
     103             :  *      append node may not be scanned, but this way all of the
     104             :  *      structures get allocated in the executor's top level memory
     105             :  *      block instead of that of the call to ExecAppend.)
     106             :  * ----------------------------------------------------------------
     107             :  */
     108             : AppendState *
     109       14368 : ExecInitAppend(Append *node, EState *estate, int eflags)
     110             : {
     111       14368 :     AppendState *appendstate = makeNode(AppendState);
     112             :     PlanState **appendplanstates;
     113             :     const TupleTableSlotOps *appendops;
     114             :     Bitmapset  *validsubplans;
     115             :     Bitmapset  *asyncplans;
     116             :     int         nplans;
     117             :     int         nasyncplans;
     118             :     int         firstvalid;
     119             :     int         i,
     120             :                 j;
     121             : 
     122             :     /* check for unsupported flags */
     123             :     Assert(!(eflags & EXEC_FLAG_MARK));
     124             : 
     125             :     /*
     126             :      * create new AppendState for our append node
     127             :      */
     128       14368 :     appendstate->ps.plan = (Plan *) node;
     129       14368 :     appendstate->ps.state = estate;
     130       14368 :     appendstate->ps.ExecProcNode = ExecAppend;
     131             : 
     132             :     /* Let choose_next_subplan_* function handle setting the first subplan */
     133       14368 :     appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
     134       14368 :     appendstate->as_syncdone = false;
     135       14368 :     appendstate->as_begun = false;
     136             : 
     137             :     /* If run-time partition pruning is enabled, then set that up now */
     138       14368 :     if (node->part_prune_index >= 0)
     139             :     {
     140             :         PartitionPruneState *prunestate;
     141             : 
     142             :         /*
     143             :          * Set up pruning data structure.  This also initializes the set of
     144             :          * subplans to initialize (validsubplans) by taking into account the
     145             :          * result of performing initial pruning if any.
     146             :          */
     147         730 :         prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
     148         730 :                                                   list_length(node->appendplans),
     149             :                                                   node->part_prune_index,
     150             :                                                   node->apprelids,
     151             :                                                   &validsubplans);
     152         730 :         appendstate->as_prune_state = prunestate;
     153         730 :         nplans = bms_num_members(validsubplans);
     154             : 
     155             :         /*
     156             :          * When no run-time pruning is required and there's at least one
     157             :          * subplan, we can fill as_valid_subplans immediately, preventing
     158             :          * later calls to ExecFindMatchingSubPlans.
     159             :          */
     160         730 :         if (!prunestate->do_exec_prune && nplans > 0)
     161             :         {
     162         264 :             appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
     163         264 :             appendstate->as_valid_subplans_identified = true;
     164             :         }
     165             :     }
     166             :     else
     167             :     {
     168       13638 :         nplans = list_length(node->appendplans);
     169             : 
     170             :         /*
     171             :          * When run-time partition pruning is not enabled we can just mark all
     172             :          * subplans as valid; they must also all be initialized.
     173             :          */
     174             :         Assert(nplans > 0);
     175       13638 :         appendstate->as_valid_subplans = validsubplans =
     176       13638 :             bms_add_range(NULL, 0, nplans - 1);
     177       13638 :         appendstate->as_valid_subplans_identified = true;
     178       13638 :         appendstate->as_prune_state = NULL;
     179             :     }
     180             : 
     181       14368 :     appendplanstates = (PlanState **) palloc(nplans *
     182             :                                              sizeof(PlanState *));
     183             : 
     184             :     /*
     185             :      * call ExecInitNode on each of the valid plans to be executed and save
     186             :      * the results into the appendplanstates array.
     187             :      *
     188             :      * While at it, find out the first valid partial plan.
     189             :      */
     190       14368 :     j = 0;
     191       14368 :     asyncplans = NULL;
     192       14368 :     nasyncplans = 0;
     193       14368 :     firstvalid = nplans;
     194       14368 :     i = -1;
     195       56446 :     while ((i = bms_next_member(validsubplans, i)) >= 0)
     196             :     {
     197       42078 :         Plan       *initNode = (Plan *) list_nth(node->appendplans, i);
     198             : 
     199             :         /*
     200             :          * Record async subplans.  When executing EvalPlanQual, we treat them
     201             :          * as sync ones; don't do this when initializing an EvalPlanQual plan
     202             :          * tree.
     203             :          */
     204       42078 :         if (initNode->async_capable && estate->es_epq_active == NULL)
     205             :         {
     206         186 :             asyncplans = bms_add_member(asyncplans, j);
     207         186 :             nasyncplans++;
     208             :         }
     209             : 
     210             :         /*
     211             :          * Record the lowest appendplans index which is a valid partial plan.
     212             :          */
     213       42078 :         if (i >= node->first_partial_plan && j < firstvalid)
     214         448 :             firstvalid = j;
     215             : 
     216       42078 :         appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
     217             :     }
     218             : 
     219       14368 :     appendstate->as_first_partial_plan = firstvalid;
     220       14368 :     appendstate->appendplans = appendplanstates;
     221       14368 :     appendstate->as_nplans = nplans;
     222             : 
     223             :     /*
     224             :      * Initialize Append's result tuple type and slot.  If the child plans all
     225             :      * produce the same fixed slot type, we can use that slot type; otherwise
     226             :      * make a virtual slot.  (Note that the result slot itself is used only to
     227             :      * return a null tuple at end of execution; real tuples are returned to
     228             :      * the caller in the children's own result slots.  What we are doing here
     229             :      * is allowing the parent plan node to optimize if the Append will return
     230             :      * only one kind of slot.)
     231             :      */
     232       14368 :     appendops = ExecGetCommonSlotOps(appendplanstates, j);
     233       14368 :     if (appendops != NULL)
     234             :     {
     235       13410 :         ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
     236             :     }
     237             :     else
     238             :     {
     239         958 :         ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
     240             :         /* show that the output slot type is not fixed */
     241         958 :         appendstate->ps.resultopsset = true;
     242         958 :         appendstate->ps.resultopsfixed = false;
     243             :     }
     244             : 
     245             :     /* Initialize async state */
     246       14368 :     appendstate->as_asyncplans = asyncplans;
     247       14368 :     appendstate->as_nasyncplans = nasyncplans;
     248       14368 :     appendstate->as_asyncrequests = NULL;
     249       14368 :     appendstate->as_asyncresults = NULL;
     250       14368 :     appendstate->as_nasyncresults = 0;
     251       14368 :     appendstate->as_nasyncremain = 0;
     252       14368 :     appendstate->as_needrequest = NULL;
     253       14368 :     appendstate->as_eventset = NULL;
     254       14368 :     appendstate->as_valid_asyncplans = NULL;
     255             : 
     256       14368 :     if (nasyncplans > 0)
     257             :     {
     258          94 :         appendstate->as_asyncrequests = (AsyncRequest **)
     259          94 :             palloc0(nplans * sizeof(AsyncRequest *));
     260             : 
     261          94 :         i = -1;
     262         280 :         while ((i = bms_next_member(asyncplans, i)) >= 0)
     263             :         {
     264             :             AsyncRequest *areq;
     265             : 
     266         186 :             areq = palloc(sizeof(AsyncRequest));
     267         186 :             areq->requestor = (PlanState *) appendstate;
     268         186 :             areq->requestee = appendplanstates[i];
     269         186 :             areq->request_index = i;
     270         186 :             areq->callback_pending = false;
     271         186 :             areq->request_complete = false;
     272         186 :             areq->result = NULL;
     273             : 
     274         186 :             appendstate->as_asyncrequests[i] = areq;
     275             :         }
     276             : 
     277          94 :         appendstate->as_asyncresults = (TupleTableSlot **)
     278          94 :             palloc0(nasyncplans * sizeof(TupleTableSlot *));
     279             : 
     280          94 :         if (appendstate->as_valid_subplans_identified)
     281          88 :             classify_matching_subplans(appendstate);
     282             :     }
     283             : 
     284             :     /*
     285             :      * Miscellaneous initialization
     286             :      */
     287             : 
     288       14368 :     appendstate->ps.ps_ProjInfo = NULL;
     289             : 
     290             :     /* For parallel query, this will be overridden later. */
     291       14368 :     appendstate->choose_next_subplan = choose_next_subplan_locally;
     292             : 
     293       14368 :     return appendstate;
     294             : }
     295             : 
     296             : /* ----------------------------------------------------------------
     297             :  *     ExecAppend
     298             :  *
     299             :  *      Handles iteration over multiple subplans.
     300             :  * ----------------------------------------------------------------
     301             :  */
     302             : static TupleTableSlot *
     303     2586284 : ExecAppend(PlanState *pstate)
     304             : {
     305     2586284 :     AppendState *node = castNode(AppendState, pstate);
     306             :     TupleTableSlot *result;
     307             : 
     308             :     /*
     309             :      * If this is the first call after Init or ReScan, we need to do the
     310             :      * initialization work.
     311             :      */
     312     2586284 :     if (!node->as_begun)
     313             :     {
     314             :         Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
     315             :         Assert(!node->as_syncdone);
     316             : 
     317             :         /* Nothing to do if there are no subplans */
     318       30220 :         if (node->as_nplans == 0)
     319          60 :             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     320             : 
     321             :         /* If there are any async subplans, begin executing them. */
     322       30160 :         if (node->as_nasyncplans > 0)
     323          74 :             ExecAppendAsyncBegin(node);
     324             : 
     325             :         /*
     326             :          * If no sync subplan has been chosen, we must choose one before
     327             :          * proceeding.
     328             :          */
     329       30160 :         if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
     330        3262 :             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     331             : 
     332             :         Assert(node->as_syncdone ||
     333             :                (node->as_whichplan >= 0 &&
     334             :                 node->as_whichplan < node->as_nplans));
     335             : 
     336             :         /* And we're initialized. */
     337       26898 :         node->as_begun = true;
     338             :     }
     339             : 
     340             :     for (;;)
     341       35784 :     {
     342             :         PlanState  *subnode;
     343             : 
     344     2618746 :         CHECK_FOR_INTERRUPTS();
     345             : 
     346             :         /*
     347             :          * try to get a tuple from an async subplan if any
     348             :          */
     349     2618746 :         if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
     350             :         {
     351       12276 :             if (ExecAppendAsyncGetNext(node, &result))
     352       12274 :                 return result;
     353             :             Assert(!node->as_syncdone);
     354             :             Assert(bms_is_empty(node->as_needrequest));
     355             :         }
     356             : 
     357             :         /*
     358             :          * figure out which sync subplan we are currently processing
     359             :          */
     360             :         Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
     361     2606470 :         subnode = node->appendplans[node->as_whichplan];
     362             : 
     363             :         /*
     364             :          * get a tuple from the subplan
     365             :          */
     366     2606470 :         result = ExecProcNode(subnode);
     367             : 
     368     2606418 :         if (!TupIsNull(result))
     369             :         {
     370             :             /*
     371             :              * If the subplan gave us something then return it as-is. We do
     372             :              * NOT make use of the result slot that was set up in
     373             :              * ExecInitAppend; there's no need for it.
     374             :              */
     375     2544372 :             return result;
     376             :         }
     377             : 
     378             :         /*
     379             :          * wait or poll for async events if any. We do this before checking
     380             :          * for the end of iteration, because it might drain the remaining
     381             :          * async subplans.
     382             :          */
     383       62046 :         if (node->as_nasyncremain > 0)
     384          34 :             ExecAppendAsyncEventWait(node);
     385             : 
     386             :         /* choose new sync subplan; if no sync/async subplans, we're done */
     387       62046 :         if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
     388       26262 :             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     389             :     }
     390             : }
     391             : 
     392             : /* ----------------------------------------------------------------
     393             :  *      ExecEndAppend
     394             :  *
     395             :  *      Shuts down the subscans of the append node.
     396             :  *
     397             :  *      Returns nothing of interest.
     398             :  * ----------------------------------------------------------------
     399             :  */
     400             : void
     401       14072 : ExecEndAppend(AppendState *node)
     402             : {
     403             :     PlanState **appendplans;
     404             :     int         nplans;
     405             :     int         i;
     406             : 
     407             :     /*
     408             :      * get information from the node
     409             :      */
     410       14072 :     appendplans = node->appendplans;
     411       14072 :     nplans = node->as_nplans;
     412             : 
     413             :     /*
     414             :      * shut down each of the subscans
     415             :      */
     416       55438 :     for (i = 0; i < nplans; i++)
     417       41366 :         ExecEndNode(appendplans[i]);
     418       14072 : }
     419             : 
     420             : void
     421       20068 : ExecReScanAppend(AppendState *node)
     422             : {
     423       20068 :     int         nasyncplans = node->as_nasyncplans;
     424             :     int         i;
     425             : 
     426             :     /*
     427             :      * If any PARAM_EXEC Params used in pruning expressions have changed, then
     428             :      * we'd better unset the valid subplans so that they are reselected for
     429             :      * the new parameter values.
     430             :      */
     431       23336 :     if (node->as_prune_state &&
     432        3268 :         bms_overlap(node->ps.chgParam,
     433        3268 :                     node->as_prune_state->execparamids))
     434             :     {
     435        3268 :         node->as_valid_subplans_identified = false;
     436        3268 :         bms_free(node->as_valid_subplans);
     437        3268 :         node->as_valid_subplans = NULL;
     438        3268 :         bms_free(node->as_valid_asyncplans);
     439        3268 :         node->as_valid_asyncplans = NULL;
     440             :     }
     441             : 
     442       83374 :     for (i = 0; i < node->as_nplans; i++)
     443             :     {
     444       63306 :         PlanState  *subnode = node->appendplans[i];
     445             : 
     446             :         /*
     447             :          * ExecReScan doesn't know about my subplans, so I have to do
     448             :          * changed-parameter signaling myself.
     449             :          */
     450       63306 :         if (node->ps.chgParam != NULL)
     451       55256 :             UpdateChangedParamSet(subnode, node->ps.chgParam);
     452             : 
     453             :         /*
     454             :          * If chgParam of subnode is not null then plan will be re-scanned by
     455             :          * first ExecProcNode or by first ExecAsyncRequest.
     456             :          */
     457       63306 :         if (subnode->chgParam == NULL)
     458       19118 :             ExecReScan(subnode);
     459             :     }
     460             : 
     461             :     /* Reset async state */
     462       20068 :     if (nasyncplans > 0)
     463             :     {
     464          34 :         i = -1;
     465         102 :         while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
     466             :         {
     467          68 :             AsyncRequest *areq = node->as_asyncrequests[i];
     468             : 
     469          68 :             areq->callback_pending = false;
     470          68 :             areq->request_complete = false;
     471          68 :             areq->result = NULL;
     472             :         }
     473             : 
     474          34 :         node->as_nasyncresults = 0;
     475          34 :         node->as_nasyncremain = 0;
     476          34 :         bms_free(node->as_needrequest);
     477          34 :         node->as_needrequest = NULL;
     478             :     }
     479             : 
     480             :     /* Let choose_next_subplan_* function handle setting the first subplan */
     481       20068 :     node->as_whichplan = INVALID_SUBPLAN_INDEX;
     482       20068 :     node->as_syncdone = false;
     483       20068 :     node->as_begun = false;
     484       20068 : }
     485             : 
     486             : /* ----------------------------------------------------------------
     487             :  *                      Parallel Append Support
     488             :  * ----------------------------------------------------------------
     489             :  */
     490             : 
     491             : /* ----------------------------------------------------------------
     492             :  *      ExecAppendEstimate
     493             :  *
     494             :  *      Compute the amount of space we'll need in the parallel
     495             :  *      query DSM, and inform pcxt->estimator about our needs.
     496             :  * ----------------------------------------------------------------
     497             :  */
     498             : void
     499         138 : ExecAppendEstimate(AppendState *node,
     500             :                    ParallelContext *pcxt)
     501             : {
     502         138 :     node->pstate_len =
     503         138 :         add_size(offsetof(ParallelAppendState, pa_finished),
     504         138 :                  sizeof(bool) * node->as_nplans);
     505             : 
     506         138 :     shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
     507         138 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     508         138 : }
     509             : 
     510             : 
     511             : /* ----------------------------------------------------------------
     512             :  *      ExecAppendInitializeDSM
     513             :  *
     514             :  *      Set up shared state for Parallel Append.
     515             :  * ----------------------------------------------------------------
     516             :  */
     517             : void
     518         138 : ExecAppendInitializeDSM(AppendState *node,
     519             :                         ParallelContext *pcxt)
     520             : {
     521             :     ParallelAppendState *pstate;
     522             : 
     523         138 :     pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
     524         138 :     memset(pstate, 0, node->pstate_len);
     525         138 :     LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
     526         138 :     shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
     527             : 
     528         138 :     node->as_pstate = pstate;
     529         138 :     node->choose_next_subplan = choose_next_subplan_for_leader;
     530         138 : }
     531             : 
     532             : /* ----------------------------------------------------------------
     533             :  *      ExecAppendReInitializeDSM
     534             :  *
     535             :  *      Reset shared state before beginning a fresh scan.
     536             :  * ----------------------------------------------------------------
     537             :  */
     538             : void
     539           0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
     540             : {
     541           0 :     ParallelAppendState *pstate = node->as_pstate;
     542             : 
     543           0 :     pstate->pa_next_plan = 0;
     544           0 :     memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
     545           0 : }
     546             : 
     547             : /* ----------------------------------------------------------------
     548             :  *      ExecAppendInitializeWorker
     549             :  *
     550             :  *      Copy relevant information from TOC into planstate, and initialize
     551             :  *      whatever is required to choose and execute the optimal subplan.
     552             :  * ----------------------------------------------------------------
     553             :  */
     554             : void
     555         316 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
     556             : {
     557         316 :     node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
     558         316 :     node->choose_next_subplan = choose_next_subplan_for_worker;
     559         316 : }
     560             : 
     561             : /* ----------------------------------------------------------------
     562             :  *      choose_next_subplan_locally
     563             :  *
     564             :  *      Choose next sync subplan for a non-parallel-aware Append,
     565             :  *      returning false if there are no more.
     566             :  * ----------------------------------------------------------------
     567             :  */
     568             : static bool
     569       91340 : choose_next_subplan_locally(AppendState *node)
     570             : {
     571       91340 :     int         whichplan = node->as_whichplan;
     572             :     int         nextplan;
     573             : 
     574             :     /* We should never be called when there are no subplans */
     575             :     Assert(node->as_nplans > 0);
     576             : 
     577             :     /* Nothing to do if syncdone */
     578       91340 :     if (node->as_syncdone)
     579          36 :         return false;
     580             : 
     581             :     /*
     582             :      * If first call then have the bms member function choose the first valid
     583             :      * sync subplan by initializing whichplan to -1.  If there happen to be no
     584             :      * valid sync subplans then the bms member function will handle that by
     585             :      * returning a negative number which will allow us to exit returning a
     586             :      * false value.
     587             :      */
     588       91304 :     if (whichplan == INVALID_SUBPLAN_INDEX)
     589             :     {
     590       29724 :         if (node->as_nasyncplans > 0)
     591             :         {
     592             :             /* We'd have filled as_valid_subplans already */
     593             :             Assert(node->as_valid_subplans_identified);
     594             :         }
     595       29686 :         else if (!node->as_valid_subplans_identified)
     596             :         {
     597        3382 :             node->as_valid_subplans =
     598        3382 :                 ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
     599        3382 :             node->as_valid_subplans_identified = true;
     600             :         }
     601             : 
     602       29724 :         whichplan = -1;
     603             :     }
     604             : 
     605             :     /* Ensure whichplan is within the expected range */
     606             :     Assert(whichplan >= -1 && whichplan <= node->as_nplans);
     607             : 
     608       91304 :     if (ScanDirectionIsForward(node->ps.state->es_direction))
     609       91286 :         nextplan = bms_next_member(node->as_valid_subplans, whichplan);
     610             :     else
     611          18 :         nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
     612             : 
     613       91304 :     if (nextplan < 0)
     614             :     {
     615             :         /* Set as_syncdone if in async mode */
     616       29158 :         if (node->as_nasyncplans > 0)
     617          34 :             node->as_syncdone = true;
     618       29158 :         return false;
     619             :     }
     620             : 
     621       62146 :     node->as_whichplan = nextplan;
     622             : 
     623       62146 :     return true;
     624             : }
     625             : 
     626             : /* ----------------------------------------------------------------
     627             :  *      choose_next_subplan_for_leader
     628             :  *
     629             :  *      Try to pick a plan which doesn't commit us to doing much
     630             :  *      work locally, so that as much work as possible is done in
     631             :  *      the workers.  Cheapest subplans are at the end.
     632             :  * ----------------------------------------------------------------
     633             :  */
     634             : static bool
     635         510 : choose_next_subplan_for_leader(AppendState *node)
     636             : {
     637         510 :     ParallelAppendState *pstate = node->as_pstate;
     638             : 
     639             :     /* Backward scan is not supported by parallel-aware plans */
     640             :     Assert(ScanDirectionIsForward(node->ps.state->es_direction));
     641             : 
     642             :     /* We should never be called when there are no subplans */
     643             :     Assert(node->as_nplans > 0);
     644             : 
     645         510 :     LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
     646             : 
     647         510 :     if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
     648             :     {
     649             :         /* Mark just-completed subplan as finished. */
     650         390 :         node->as_pstate->pa_finished[node->as_whichplan] = true;
     651             :     }
     652             :     else
     653             :     {
     654             :         /* Start with last subplan. */
     655         120 :         node->as_whichplan = node->as_nplans - 1;
     656             : 
     657             :         /*
     658             :          * If we've yet to determine the valid subplans then do so now.  If
     659             :          * run-time pruning is disabled then the valid subplans will always be
     660             :          * set to all subplans.
     661             :          */
     662         120 :         if (!node->as_valid_subplans_identified)
     663             :         {
     664          24 :             node->as_valid_subplans =
     665          24 :                 ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
     666          24 :             node->as_valid_subplans_identified = true;
     667             : 
     668             :             /*
     669             :              * Mark each invalid plan as finished to allow the loop below to
     670             :              * select the first valid subplan.
     671             :              */
     672          24 :             mark_invalid_subplans_as_finished(node);
     673             :         }
     674             :     }
     675             : 
     676             :     /* Loop until we find a subplan to execute. */
     677         810 :     while (pstate->pa_finished[node->as_whichplan])
     678             :     {
     679         420 :         if (node->as_whichplan == 0)
     680             :         {
     681         120 :             pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
     682         120 :             node->as_whichplan = INVALID_SUBPLAN_INDEX;
     683         120 :             LWLockRelease(&pstate->pa_lock);
     684         120 :             return false;
     685             :         }
     686             : 
     687             :         /*
     688             :          * We needn't pay attention to as_valid_subplans here as all invalid
     689             :          * plans have been marked as finished.
     690             :          */
     691         300 :         node->as_whichplan--;
     692             :     }
     693             : 
     694             :     /* If non-partial, immediately mark as finished. */
     695         390 :     if (node->as_whichplan < node->as_first_partial_plan)
     696         132 :         node->as_pstate->pa_finished[node->as_whichplan] = true;
     697             : 
     698         390 :     LWLockRelease(&pstate->pa_lock);
     699             : 
     700         390 :     return true;
     701             : }
     702             : 
     703             : /* ----------------------------------------------------------------
     704             :  *      choose_next_subplan_for_worker
     705             :  *
     706             :  *      Choose next subplan for a parallel-aware Append, returning
     707             :  *      false if there are no more.
     708             :  *
     709             :  *      We start from the first plan and advance through the list;
     710             :  *      when we get back to the end, we loop back to the first
     711             :  *      partial plan.  This assigns the non-partial plans first in
     712             :  *      order of descending cost and then spreads out the workers
     713             :  *      as evenly as possible across the remaining partial plans.
     714             :  * ----------------------------------------------------------------
     715             :  */
     716             : static bool
     717         356 : choose_next_subplan_for_worker(AppendState *node)
     718             : {
     719         356 :     ParallelAppendState *pstate = node->as_pstate;
     720             : 
     721             :     /* Backward scan is not supported by parallel-aware plans */
     722             :     Assert(ScanDirectionIsForward(node->ps.state->es_direction));
     723             : 
     724             :     /* We should never be called when there are no subplans */
     725             :     Assert(node->as_nplans > 0);
     726             : 
     727         356 :     LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
     728             : 
     729             :     /* Mark just-completed subplan as finished. */
     730         356 :     if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
     731          76 :         node->as_pstate->pa_finished[node->as_whichplan] = true;
     732             : 
     733             :     /*
     734             :      * If we've yet to determine the valid subplans then do so now.  If
     735             :      * run-time pruning is disabled then the valid subplans will always be set
     736             :      * to all subplans.
     737             :      */
     738         280 :     else if (!node->as_valid_subplans_identified)
     739             :     {
     740          24 :         node->as_valid_subplans =
     741          24 :             ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
     742          24 :         node->as_valid_subplans_identified = true;
     743             : 
     744          24 :         mark_invalid_subplans_as_finished(node);
     745             :     }
     746             : 
     747             :     /* If all the plans are already done, we have nothing to do */
     748         356 :     if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
     749             :     {
     750         260 :         LWLockRelease(&pstate->pa_lock);
     751         260 :         return false;
     752             :     }
     753             : 
     754             :     /* Save the plan from which we are starting the search. */
     755          96 :     node->as_whichplan = pstate->pa_next_plan;
     756             : 
     757             :     /* Loop until we find a valid subplan to execute. */
     758         192 :     while (pstate->pa_finished[pstate->pa_next_plan])
     759             :     {
     760             :         int         nextplan;
     761             : 
     762         116 :         nextplan = bms_next_member(node->as_valid_subplans,
     763             :                                    pstate->pa_next_plan);
     764         116 :         if (nextplan >= 0)
     765             :         {
     766             :             /* Advance to the next valid plan. */
     767          96 :             pstate->pa_next_plan = nextplan;
     768             :         }
     769          20 :         else if (node->as_whichplan > node->as_first_partial_plan)
     770             :         {
     771             :             /*
     772             :              * Try looping back to the first valid partial plan, if there is
     773             :              * one.  If there isn't, arrange to bail out below.
     774             :              */
     775           6 :             nextplan = bms_next_member(node->as_valid_subplans,
     776           6 :                                        node->as_first_partial_plan - 1);
     777           6 :             pstate->pa_next_plan =
     778           6 :                 nextplan < 0 ? node->as_whichplan : nextplan;
     779             :         }
     780             :         else
     781             :         {
     782             :             /*
     783             :              * At last plan, and either there are no partial plans or we've
     784             :              * tried them all.  Arrange to bail out.
     785             :              */
     786          14 :             pstate->pa_next_plan = node->as_whichplan;
     787             :         }
     788             : 
     789         116 :         if (pstate->pa_next_plan == node->as_whichplan)
     790             :         {
     791             :             /* We've tried everything! */
     792          20 :             pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
     793          20 :             LWLockRelease(&pstate->pa_lock);
     794          20 :             return false;
     795             :         }
     796             :     }
     797             : 
     798             :     /* Pick the plan we found, and advance pa_next_plan one more time. */
     799          76 :     node->as_whichplan = pstate->pa_next_plan;
     800          76 :     pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
     801             :                                            pstate->pa_next_plan);
     802             : 
     803             :     /*
     804             :      * If there are no more valid plans then try setting the next plan to the
     805             :      * first valid partial plan.
     806             :      */
     807          76 :     if (pstate->pa_next_plan < 0)
     808             :     {
     809          24 :         int         nextplan = bms_next_member(node->as_valid_subplans,
     810          24 :                                                node->as_first_partial_plan - 1);
     811             : 
     812          24 :         if (nextplan >= 0)
     813          24 :             pstate->pa_next_plan = nextplan;
     814             :         else
     815             :         {
     816             :             /*
     817             :              * There are no valid partial plans, and we already chose the last
     818             :              * non-partial plan; so flag that there's nothing more for our
     819             :              * fellow workers to do.
     820             :              */
     821           0 :             pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
     822             :         }
     823             :     }
     824             : 
     825             :     /* If non-partial, immediately mark as finished. */
     826          76 :     if (node->as_whichplan < node->as_first_partial_plan)
     827           6 :         node->as_pstate->pa_finished[node->as_whichplan] = true;
     828             : 
     829          76 :     LWLockRelease(&pstate->pa_lock);
     830             : 
     831          76 :     return true;
     832             : }
     833             : 
     834             : /*
     835             :  * mark_invalid_subplans_as_finished
     836             :  *      Marks the ParallelAppendState's pa_finished as true for each invalid
     837             :  *      subplan.
     838             :  *
     839             :  * This function should only be called for parallel Append with run-time
     840             :  * pruning enabled.
     841             :  */
     842             : static void
     843          48 : mark_invalid_subplans_as_finished(AppendState *node)
     844             : {
     845             :     int         i;
     846             : 
     847             :     /* Only valid to call this while in parallel Append mode */
     848             :     Assert(node->as_pstate);
     849             : 
     850             :     /* Shouldn't have been called when run-time pruning is not enabled */
     851             :     Assert(node->as_prune_state);
     852             : 
     853             :     /* Nothing to do if all plans are valid */
     854          48 :     if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
     855           0 :         return;
     856             : 
     857             :     /* Mark all non-valid plans as finished */
     858         162 :     for (i = 0; i < node->as_nplans; i++)
     859             :     {
     860         114 :         if (!bms_is_member(i, node->as_valid_subplans))
     861          48 :             node->as_pstate->pa_finished[i] = true;
     862             :     }
     863             : }
     864             : 
     865             : /* ----------------------------------------------------------------
     866             :  *                      Asynchronous Append Support
     867             :  * ----------------------------------------------------------------
     868             :  */
     869             : 
     870             : /* ----------------------------------------------------------------
     871             :  *      ExecAppendAsyncBegin
     872             :  *
     873             :  *      Begin executing designed async-capable subplans.
     874             :  * ----------------------------------------------------------------
     875             :  */
     876             : static void
     877          74 : ExecAppendAsyncBegin(AppendState *node)
     878             : {
     879             :     int         i;
     880             : 
     881             :     /* Backward scan is not supported by async-aware Appends. */
     882             :     Assert(ScanDirectionIsForward(node->ps.state->es_direction));
     883             : 
     884             :     /* We should never be called when there are no subplans */
     885             :     Assert(node->as_nplans > 0);
     886             : 
     887             :     /* We should never be called when there are no async subplans. */
     888             :     Assert(node->as_nasyncplans > 0);
     889             : 
     890             :     /* If we've yet to determine the valid subplans then do so now. */
     891          74 :     if (!node->as_valid_subplans_identified)
     892             :     {
     893           4 :         node->as_valid_subplans =
     894           4 :             ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
     895           4 :         node->as_valid_subplans_identified = true;
     896             : 
     897           4 :         classify_matching_subplans(node);
     898             :     }
     899             : 
     900             :     /* Initialize state variables. */
     901          74 :     node->as_syncdone = bms_is_empty(node->as_valid_subplans);
     902          74 :     node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
     903             : 
     904             :     /* Nothing to do if there are no valid async subplans. */
     905          74 :     if (node->as_nasyncremain == 0)
     906           0 :         return;
     907             : 
     908             :     /* Make a request for each of the valid async subplans. */
     909          74 :     i = -1;
     910         218 :     while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
     911             :     {
     912         144 :         AsyncRequest *areq = node->as_asyncrequests[i];
     913             : 
     914             :         Assert(areq->request_index == i);
     915             :         Assert(!areq->callback_pending);
     916             : 
     917             :         /* Do the actual work. */
     918         144 :         ExecAsyncRequest(areq);
     919             :     }
     920             : }
     921             : 
     922             : /* ----------------------------------------------------------------
     923             :  *      ExecAppendAsyncGetNext
     924             :  *
     925             :  *      Get the next tuple from any of the asynchronous subplans.
     926             :  * ----------------------------------------------------------------
     927             :  */
     928             : static bool
     929       12276 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
     930             : {
     931       12276 :     *result = NULL;
     932             : 
     933             :     /* We should never be called when there are no valid async subplans. */
     934             :     Assert(node->as_nasyncremain > 0);
     935             : 
     936             :     /* Request a tuple asynchronously. */
     937       12276 :     if (ExecAppendAsyncRequest(node, result))
     938       12114 :         return true;
     939             : 
     940         238 :     while (node->as_nasyncremain > 0)
     941             :     {
     942         176 :         CHECK_FOR_INTERRUPTS();
     943             : 
     944             :         /* Wait or poll for async events. */
     945         176 :         ExecAppendAsyncEventWait(node);
     946             : 
     947             :         /* Request a tuple asynchronously. */
     948         174 :         if (ExecAppendAsyncRequest(node, result))
     949          98 :             return true;
     950             : 
     951             :         /* Break from loop if there's any sync subplan that isn't complete. */
     952          76 :         if (!node->as_syncdone)
     953           0 :             break;
     954             :     }
     955             : 
     956             :     /*
     957             :      * If all sync subplans are complete, we're totally done scanning the
     958             :      * given node.  Otherwise, we're done with the asynchronous stuff but must
     959             :      * continue scanning the sync subplans.
     960             :      */
     961          62 :     if (node->as_syncdone)
     962             :     {
     963             :         Assert(node->as_nasyncremain == 0);
     964          62 :         *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
     965          62 :         return true;
     966             :     }
     967             : 
     968           0 :     return false;
     969             : }
     970             : 
     971             : /* ----------------------------------------------------------------
     972             :  *      ExecAppendAsyncRequest
     973             :  *
     974             :  *      Request a tuple asynchronously.
     975             :  * ----------------------------------------------------------------
     976             :  */
     977             : static bool
     978       12450 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
     979             : {
     980             :     Bitmapset  *needrequest;
     981             :     int         i;
     982             : 
     983             :     /* Nothing to do if there are no async subplans needing a new request. */
     984       12450 :     if (bms_is_empty(node->as_needrequest))
     985             :     {
     986             :         Assert(node->as_nasyncresults == 0);
     987         116 :         return false;
     988             :     }
     989             : 
     990             :     /*
     991             :      * If there are any asynchronously-generated results that have not yet
     992             :      * been returned, we have nothing to do; just return one of them.
     993             :      */
     994       12334 :     if (node->as_nasyncresults > 0)
     995             :     {
     996        4774 :         --node->as_nasyncresults;
     997        4774 :         *result = node->as_asyncresults[node->as_nasyncresults];
     998        4774 :         return true;
     999             :     }
    1000             : 
    1001             :     /* Make a new request for each of the async subplans that need it. */
    1002        7560 :     needrequest = node->as_needrequest;
    1003        7560 :     node->as_needrequest = NULL;
    1004        7560 :     i = -1;
    1005       19766 :     while ((i = bms_next_member(needrequest, i)) >= 0)
    1006             :     {
    1007       12206 :         AsyncRequest *areq = node->as_asyncrequests[i];
    1008             : 
    1009             :         /* Do the actual work. */
    1010       12206 :         ExecAsyncRequest(areq);
    1011             :     }
    1012        7560 :     bms_free(needrequest);
    1013             : 
    1014             :     /* Return one of the asynchronously-generated results if any. */
    1015        7560 :     if (node->as_nasyncresults > 0)
    1016             :     {
    1017        7438 :         --node->as_nasyncresults;
    1018        7438 :         *result = node->as_asyncresults[node->as_nasyncresults];
    1019        7438 :         return true;
    1020             :     }
    1021             : 
    1022         122 :     return false;
    1023             : }
    1024             : 
    1025             : /* ----------------------------------------------------------------
    1026             :  *      ExecAppendAsyncEventWait
    1027             :  *
    1028             :  *      Wait or poll for file descriptor events and fire callbacks.
    1029             :  * ----------------------------------------------------------------
    1030             :  */
    1031             : static void
    1032         210 : ExecAppendAsyncEventWait(AppendState *node)
    1033             : {
    1034         210 :     int         nevents = node->as_nasyncplans + 2;
    1035         210 :     long        timeout = node->as_syncdone ? -1 : 0;
    1036             :     WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
    1037             :     int         noccurred;
    1038             :     int         i;
    1039             : 
    1040             :     /* We should never be called when there are no valid async subplans. */
    1041             :     Assert(node->as_nasyncremain > 0);
    1042             : 
    1043             :     Assert(node->as_eventset == NULL);
    1044         210 :     node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
    1045         210 :     AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
    1046             :                       NULL, NULL);
    1047             : 
    1048             :     /* Give each waiting subplan a chance to add an event. */
    1049         210 :     i = -1;
    1050         640 :     while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
    1051             :     {
    1052         432 :         AsyncRequest *areq = node->as_asyncrequests[i];
    1053             : 
    1054         432 :         if (areq->callback_pending)
    1055         372 :             ExecAsyncConfigureWait(areq);
    1056             :     }
    1057             : 
    1058             :     /*
    1059             :      * No need for further processing if none of the subplans configured any
    1060             :      * events.
    1061             :      */
    1062         208 :     if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
    1063             :     {
    1064           2 :         FreeWaitEventSet(node->as_eventset);
    1065           2 :         node->as_eventset = NULL;
    1066           2 :         return;
    1067             :     }
    1068             : 
    1069             :     /*
    1070             :      * Add the process latch to the set, so that we wake up to process the
    1071             :      * standard interrupts with CHECK_FOR_INTERRUPTS().
    1072             :      *
    1073             :      * NOTE: For historical reasons, it's important that this is added to the
    1074             :      * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
    1075             :      * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
    1076             :      * any other events are in the set.  That's a poor design, it's
    1077             :      * questionable for postgres_fdw to be doing that in the first place, but
    1078             :      * we cannot change it now.  The pattern has possibly been copied to other
    1079             :      * extensions too.
    1080             :      */
    1081         206 :     AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
    1082             :                       MyLatch, NULL);
    1083             : 
    1084             :     /* Return at most EVENT_BUFFER_SIZE events in one call. */
    1085         206 :     if (nevents > EVENT_BUFFER_SIZE)
    1086           0 :         nevents = EVENT_BUFFER_SIZE;
    1087             : 
    1088             :     /*
    1089             :      * If the timeout is -1, wait until at least one event occurs.  If the
    1090             :      * timeout is 0, poll for events, but do not wait at all.
    1091             :      */
    1092         206 :     noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
    1093             :                                  nevents, WAIT_EVENT_APPEND_READY);
    1094         206 :     FreeWaitEventSet(node->as_eventset);
    1095         206 :     node->as_eventset = NULL;
    1096         206 :     if (noccurred == 0)
    1097           0 :         return;
    1098             : 
    1099             :     /* Deliver notifications. */
    1100         502 :     for (i = 0; i < noccurred; i++)
    1101             :     {
    1102         296 :         WaitEvent  *w = &occurred_event[i];
    1103             : 
    1104             :         /*
    1105             :          * Each waiting subplan should have registered its wait event with
    1106             :          * user_data pointing back to its AsyncRequest.
    1107             :          */
    1108         296 :         if ((w->events & WL_SOCKET_READABLE) != 0)
    1109             :         {
    1110         296 :             AsyncRequest *areq = (AsyncRequest *) w->user_data;
    1111             : 
    1112         296 :             if (areq->callback_pending)
    1113             :             {
    1114             :                 /*
    1115             :                  * Mark it as no longer needing a callback.  We must do this
    1116             :                  * before dispatching the callback in case the callback resets
    1117             :                  * the flag.
    1118             :                  */
    1119         296 :                 areq->callback_pending = false;
    1120             : 
    1121             :                 /* Do the actual work. */
    1122         296 :                 ExecAsyncNotify(areq);
    1123             :             }
    1124             :         }
    1125             : 
    1126             :         /* Handle standard interrupts */
    1127         296 :         if ((w->events & WL_LATCH_SET) != 0)
    1128             :         {
    1129           0 :             ResetLatch(MyLatch);
    1130           0 :             CHECK_FOR_INTERRUPTS();
    1131             :         }
    1132             :     }
    1133             : }
    1134             : 
    1135             : /* ----------------------------------------------------------------
    1136             :  *      ExecAsyncAppendResponse
    1137             :  *
    1138             :  *      Receive a response from an asynchronous request we made.
    1139             :  * ----------------------------------------------------------------
    1140             :  */
    1141             : void
    1142       12656 : ExecAsyncAppendResponse(AsyncRequest *areq)
    1143             : {
    1144       12656 :     AppendState *node = (AppendState *) areq->requestor;
    1145       12656 :     TupleTableSlot *slot = areq->result;
    1146             : 
    1147             :     /* The result should be a TupleTableSlot or NULL. */
    1148             :     Assert(slot == NULL || IsA(slot, TupleTableSlot));
    1149             : 
    1150             :     /* Nothing to do if the request is pending. */
    1151       12656 :     if (!areq->request_complete)
    1152             :     {
    1153             :         /* The request would have been pending for a callback. */
    1154             :         Assert(areq->callback_pending);
    1155         322 :         return;
    1156             :     }
    1157             : 
    1158             :     /* If the result is NULL or an empty slot, there's nothing more to do. */
    1159       12334 :     if (TupIsNull(slot))
    1160             :     {
    1161             :         /* The ending subplan wouldn't have been pending for a callback. */
    1162             :         Assert(!areq->callback_pending);
    1163         120 :         --node->as_nasyncremain;
    1164         120 :         return;
    1165             :     }
    1166             : 
    1167             :     /* Save result so we can return it. */
    1168             :     Assert(node->as_nasyncresults < node->as_nasyncplans);
    1169       12214 :     node->as_asyncresults[node->as_nasyncresults++] = slot;
    1170             : 
    1171             :     /*
    1172             :      * Mark the subplan that returned a result as ready for a new request.  We
    1173             :      * don't launch another one here immediately because it might complete.
    1174             :      */
    1175       12214 :     node->as_needrequest = bms_add_member(node->as_needrequest,
    1176             :                                           areq->request_index);
    1177             : }
    1178             : 
    1179             : /* ----------------------------------------------------------------
    1180             :  *      classify_matching_subplans
    1181             :  *
    1182             :  *      Classify the node's as_valid_subplans into sync ones and
    1183             :  *      async ones, adjust it to contain sync ones only, and save
    1184             :  *      async ones in the node's as_valid_asyncplans.
    1185             :  * ----------------------------------------------------------------
    1186             :  */
    1187             : static void
    1188          92 : classify_matching_subplans(AppendState *node)
    1189             : {
    1190             :     Bitmapset  *valid_asyncplans;
    1191             : 
    1192             :     Assert(node->as_valid_subplans_identified);
    1193             :     Assert(node->as_valid_asyncplans == NULL);
    1194             : 
    1195             :     /* Nothing to do if there are no valid subplans. */
    1196          92 :     if (bms_is_empty(node->as_valid_subplans))
    1197             :     {
    1198           0 :         node->as_syncdone = true;
    1199           0 :         node->as_nasyncremain = 0;
    1200           0 :         return;
    1201             :     }
    1202             : 
    1203             :     /* Nothing to do if there are no valid async subplans. */
    1204          92 :     if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
    1205             :     {
    1206           0 :         node->as_nasyncremain = 0;
    1207           0 :         return;
    1208             :     }
    1209             : 
    1210             :     /* Get valid async subplans. */
    1211          92 :     valid_asyncplans = bms_intersect(node->as_asyncplans,
    1212          92 :                                      node->as_valid_subplans);
    1213             : 
    1214             :     /* Adjust the valid subplans to contain sync subplans only. */
    1215          92 :     node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
    1216             :                                               valid_asyncplans);
    1217             : 
    1218             :     /* Save valid async subplans. */
    1219          92 :     node->as_valid_asyncplans = valid_asyncplans;
    1220             : }

Generated by: LCOV version 1.14