LCOV - code coverage report
Current view: top level - src/backend/executor - nodeMergeAppend.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 111 116 95.7 %
Date: 2019-06-19 14:06:47 Functions: 5 5 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * nodeMergeAppend.c
       4             :  *    routines to handle MergeAppend nodes.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/executor/nodeMergeAppend.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : /* INTERFACE ROUTINES
      16             :  *      ExecInitMergeAppend     - initialize the MergeAppend node
      17             :  *      ExecMergeAppend         - retrieve the next tuple from the node
      18             :  *      ExecEndMergeAppend      - shut down the MergeAppend node
      19             :  *      ExecReScanMergeAppend   - rescan the MergeAppend node
      20             :  *
      21             :  *   NOTES
      22             :  *      A MergeAppend node contains a list of one or more subplans.
      23             :  *      These are each expected to deliver tuples that are sorted according
      24             :  *      to a common sort key.  The MergeAppend node merges these streams
      25             :  *      to produce output sorted the same way.
      26             :  *
      27             :  *      MergeAppend nodes don't make use of their left and right
      28             :  *      subtrees, rather they maintain a list of subplans so
      29             :  *      a typical MergeAppend node looks like this in the plan tree:
      30             :  *
      31             :  *                 ...
      32             :  *                 /
      33             :  *              MergeAppend---+------+------+--- nil
      34             :  *              /   \         |      |      |
      35             :  *            nil   nil      ...    ...    ...
      36             :  *                               subplans
      37             :  */
      38             : 
      39             : #include "postgres.h"
      40             : 
      41             : #include "executor/execdebug.h"
      42             : #include "executor/execPartition.h"
      43             : #include "executor/nodeMergeAppend.h"
      44             : #include "lib/binaryheap.h"
      45             : #include "miscadmin.h"
      46             : 
      47             : /*
      48             :  * We have one slot for each item in the heap array.  We use SlotNumber
      49             :  * to store slot indexes.  This doesn't actually provide any formal
      50             :  * type-safety, but it makes the code more self-documenting.
      51             :  */
      52             : typedef int32 SlotNumber;
      53             : 
      54             : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
      55             : static int  heap_compare_slots(Datum a, Datum b, void *arg);
      56             : 
      57             : 
      58             : /* ----------------------------------------------------------------
      59             :  *      ExecInitMergeAppend
      60             :  *
      61             :  *      Begin all of the subscans of the MergeAppend node.
      62             :  * ----------------------------------------------------------------
      63             :  */
      64             : MergeAppendState *
      65         260 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
      66             : {
      67         260 :     MergeAppendState *mergestate = makeNode(MergeAppendState);
      68             :     PlanState **mergeplanstates;
      69             :     Bitmapset  *validsubplans;
      70             :     int         nplans;
      71             :     int         i,
      72             :                 j;
      73             :     ListCell   *lc;
      74             : 
      75             :     /* check for unsupported flags */
      76             :     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
      77             : 
      78             :     /*
      79             :      * create new MergeAppendState for our node
      80             :      */
      81         260 :     mergestate->ps.plan = (Plan *) node;
      82         260 :     mergestate->ps.state = estate;
      83         260 :     mergestate->ps.ExecProcNode = ExecMergeAppend;
      84         260 :     mergestate->ms_noopscan = false;
      85             : 
      86             :     /* If run-time partition pruning is enabled, then set that up now */
      87         260 :     if (node->part_prune_info != NULL)
      88             :     {
      89             :         PartitionPruneState *prunestate;
      90             : 
      91             :         /* We may need an expression context to evaluate partition exprs */
      92          28 :         ExecAssignExprContext(estate, &mergestate->ps);
      93             : 
      94          28 :         prunestate = ExecCreatePartitionPruneState(&mergestate->ps,
      95          28 :                                                    node->part_prune_info);
      96          28 :         mergestate->ms_prune_state = prunestate;
      97             : 
      98             :         /* Perform an initial partition prune, if required. */
      99          28 :         if (prunestate->do_initial_prune)
     100             :         {
     101             :             /* Determine which subplans survive initial pruning */
     102          24 :             validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
     103          24 :                                                             list_length(node->mergeplans));
     104             : 
     105             :             /*
     106             :              * The case where no subplans survive pruning must be handled
     107             :              * specially.  The problem here is that code in explain.c requires
     108             :              * a MergeAppend to have at least one subplan in order for it to
     109             :              * properly determine the Vars in that subplan's targetlist.  We
     110             :              * sidestep this issue by just initializing the first subplan and
     111             :              * setting ms_noopscan to true to indicate that we don't really
     112             :              * need to scan any subnodes.
     113             :              */
     114          24 :             if (bms_is_empty(validsubplans))
     115             :             {
     116           8 :                 mergestate->ms_noopscan = true;
     117             : 
     118             :                 /* Mark the first as valid so that it's initialized below */
     119           8 :                 validsubplans = bms_make_singleton(0);
     120             :             }
     121             : 
     122          24 :             nplans = bms_num_members(validsubplans);
     123             :         }
     124             :         else
     125             :         {
     126             :             /* We'll need to initialize all subplans */
     127           4 :             nplans = list_length(node->mergeplans);
     128             :             Assert(nplans > 0);
     129           4 :             validsubplans = bms_add_range(NULL, 0, nplans - 1);
     130             :         }
     131             : 
     132             :         /*
     133             :          * If no runtime pruning is required, we can fill ms_valid_subplans
     134             :          * immediately, preventing later calls to ExecFindMatchingSubPlans.
     135             :          */
     136          28 :         if (!prunestate->do_exec_prune)
     137             :         {
     138             :             Assert(nplans > 0);
     139          24 :             mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
     140             :         }
     141             :     }
     142             :     else
     143             :     {
     144         232 :         nplans = list_length(node->mergeplans);
     145             : 
     146             :         /*
     147             :          * When run-time partition pruning is not enabled we can just mark all
     148             :          * subplans as valid; they must also all be initialized.
     149             :          */
     150             :         Assert(nplans > 0);
     151         232 :         mergestate->ms_valid_subplans = validsubplans =
     152         232 :             bms_add_range(NULL, 0, nplans - 1);
     153         232 :         mergestate->ms_prune_state = NULL;
     154             :     }
     155             : 
     156         260 :     mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
     157         260 :     mergestate->mergeplans = mergeplanstates;
     158         260 :     mergestate->ms_nplans = nplans;
     159             : 
     160         260 :     mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
     161         260 :     mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
     162             :                                               mergestate);
     163             : 
     164             :     /*
     165             :      * Miscellaneous initialization
     166             :      *
     167             :      * MergeAppend nodes do have Result slots, which hold pointers to tuples,
     168             :      * so we have to initialize them.  FIXME
     169             :      */
     170         260 :     ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
     171             : 
     172             :     /* node returns slots from each of its subnodes, therefore not fixed */
     173         260 :     mergestate->ps.resultopsset = true;
     174         260 :     mergestate->ps.resultopsfixed = false;
     175             : 
     176             :     /*
     177             :      * call ExecInitNode on each of the valid plans to be executed and save
     178             :      * the results into the mergeplanstates array.
     179             :      */
     180         260 :     j = i = 0;
     181        1060 :     foreach(lc, node->mergeplans)
     182             :     {
     183         800 :         if (bms_is_member(i, validsubplans))
     184             :         {
     185         760 :             Plan       *initNode = (Plan *) lfirst(lc);
     186             : 
     187         760 :             mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
     188             :         }
     189         800 :         i++;
     190             :     }
     191             : 
     192         260 :     mergestate->ps.ps_ProjInfo = NULL;
     193             : 
     194             :     /*
     195             :      * initialize sort-key information
     196             :      */
     197         260 :     mergestate->ms_nkeys = node->numCols;
     198         260 :     mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
     199             : 
     200         552 :     for (i = 0; i < node->numCols; i++)
     201             :     {
     202         292 :         SortSupport sortKey = mergestate->ms_sortkeys + i;
     203             : 
     204         292 :         sortKey->ssup_cxt = CurrentMemoryContext;
     205         292 :         sortKey->ssup_collation = node->collations[i];
     206         292 :         sortKey->ssup_nulls_first = node->nullsFirst[i];
     207         292 :         sortKey->ssup_attno = node->sortColIdx[i];
     208             : 
     209             :         /*
     210             :          * It isn't feasible to perform abbreviated key conversion, since
     211             :          * tuples are pulled into mergestate's binary heap as needed.  It
     212             :          * would likely be counter-productive to convert tuples into an
     213             :          * abbreviated representation as they're pulled up, so opt out of that
     214             :          * additional optimization entirely.
     215             :          */
     216         292 :         sortKey->abbreviate = false;
     217             : 
     218         292 :         PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
     219             :     }
     220             : 
     221             :     /*
     222             :      * initialize to show we have not run the subplans yet
     223             :      */
     224         260 :     mergestate->ms_initialized = false;
     225             : 
     226         260 :     return mergestate;
     227             : }
     228             : 
     229             : /* ----------------------------------------------------------------
     230             :  *     ExecMergeAppend
     231             :  *
     232             :  *      Handles iteration over multiple subplans.
     233             :  * ----------------------------------------------------------------
     234             :  */
     235             : static TupleTableSlot *
     236        8972 : ExecMergeAppend(PlanState *pstate)
     237             : {
     238        8972 :     MergeAppendState *node = castNode(MergeAppendState, pstate);
     239             :     TupleTableSlot *result;
     240             :     SlotNumber  i;
     241             : 
     242        8972 :     CHECK_FOR_INTERRUPTS();
     243             : 
     244        8972 :     if (!node->ms_initialized)
     245             :     {
     246             :         /* Nothing to do if all subplans were pruned */
     247         144 :         if (node->ms_noopscan)
     248           8 :             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     249             : 
     250             :         /*
     251             :          * If we've yet to determine the valid subplans then do so now.  If
     252             :          * run-time pruning is disabled then the valid subplans will always be
     253             :          * set to all subplans.
     254             :          */
     255         136 :         if (node->ms_valid_subplans == NULL)
     256           4 :             node->ms_valid_subplans =
     257           4 :                 ExecFindMatchingSubPlans(node->ms_prune_state);
     258             : 
     259             :         /*
     260             :          * First time through: pull the first tuple from each valid subplan,
     261             :          * and set up the heap.
     262             :          */
     263         136 :         i = -1;
     264         668 :         while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
     265             :         {
     266         396 :             node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
     267         396 :             if (!TupIsNull(node->ms_slots[i]))
     268         344 :                 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
     269             :         }
     270         136 :         binaryheap_build(node->ms_heap);
     271         136 :         node->ms_initialized = true;
     272             :     }
     273             :     else
     274             :     {
     275             :         /*
     276             :          * Otherwise, pull the next tuple from whichever subplan we returned
     277             :          * from last time, and reinsert the subplan index into the heap,
     278             :          * because it might now compare differently against the existing
     279             :          * elements of the heap.  (We could perhaps simplify the logic a bit
     280             :          * by doing this before returning from the prior call, but it's better
     281             :          * to not pull tuples until necessary.)
     282             :          */
     283        8828 :         i = DatumGetInt32(binaryheap_first(node->ms_heap));
     284        8828 :         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
     285        8828 :         if (!TupIsNull(node->ms_slots[i]))
     286        8600 :             binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
     287             :         else
     288         228 :             (void) binaryheap_remove_first(node->ms_heap);
     289             :     }
     290             : 
     291        8964 :     if (binaryheap_empty(node->ms_heap))
     292             :     {
     293             :         /* All the subplans are exhausted, and so is the heap */
     294         100 :         result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
     295             :     }
     296             :     else
     297             :     {
     298        8864 :         i = DatumGetInt32(binaryheap_first(node->ms_heap));
     299        8864 :         result = node->ms_slots[i];
     300             :     }
     301             : 
     302        8964 :     return result;
     303             : }
     304             : 
     305             : /*
     306             :  * Compare the tuples in the two given slots.
     307             :  */
     308             : static int32
     309        9272 : heap_compare_slots(Datum a, Datum b, void *arg)
     310             : {
     311        9272 :     MergeAppendState *node = (MergeAppendState *) arg;
     312        9272 :     SlotNumber  slot1 = DatumGetInt32(a);
     313        9272 :     SlotNumber  slot2 = DatumGetInt32(b);
     314             : 
     315        9272 :     TupleTableSlot *s1 = node->ms_slots[slot1];
     316        9272 :     TupleTableSlot *s2 = node->ms_slots[slot2];
     317             :     int         nkey;
     318             : 
     319             :     Assert(!TupIsNull(s1));
     320             :     Assert(!TupIsNull(s2));
     321             : 
     322        9676 :     for (nkey = 0; nkey < node->ms_nkeys; nkey++)
     323             :     {
     324        9272 :         SortSupport sortKey = node->ms_sortkeys + nkey;
     325        9272 :         AttrNumber  attno = sortKey->ssup_attno;
     326             :         Datum       datum1,
     327             :                     datum2;
     328             :         bool        isNull1,
     329             :                     isNull2;
     330             :         int         compare;
     331             : 
     332        9272 :         datum1 = slot_getattr(s1, attno, &isNull1);
     333        9272 :         datum2 = slot_getattr(s2, attno, &isNull2);
     334             : 
     335        9272 :         compare = ApplySortComparator(datum1, isNull1,
     336             :                                       datum2, isNull2,
     337             :                                       sortKey);
     338        9272 :         if (compare != 0)
     339             :         {
     340        8868 :             INVERT_COMPARE_RESULT(compare);
     341        8868 :             return compare;
     342             :         }
     343             :     }
     344         404 :     return 0;
     345             : }
     346             : 
     347             : /* ----------------------------------------------------------------
     348             :  *      ExecEndMergeAppend
     349             :  *
     350             :  *      Shuts down the subscans of the MergeAppend node.
     351             :  *
     352             :  *      Returns nothing of interest.
     353             :  * ----------------------------------------------------------------
     354             :  */
     355             : void
     356         260 : ExecEndMergeAppend(MergeAppendState *node)
     357             : {
     358             :     PlanState **mergeplans;
     359             :     int         nplans;
     360             :     int         i;
     361             : 
     362             :     /*
     363             :      * get information from the node
     364             :      */
     365         260 :     mergeplans = node->mergeplans;
     366         260 :     nplans = node->ms_nplans;
     367             : 
     368             :     /*
     369             :      * shut down each of the subscans
     370             :      */
     371        1020 :     for (i = 0; i < nplans; i++)
     372         760 :         ExecEndNode(mergeplans[i]);
     373         260 : }
     374             : 
     375             : void
     376          12 : ExecReScanMergeAppend(MergeAppendState *node)
     377             : {
     378             :     int         i;
     379             : 
     380             :     /*
     381             :      * If any PARAM_EXEC Params used in pruning expressions have changed, then
     382             :      * we'd better unset the valid subplans so that they are reselected for
     383             :      * the new parameter values.
     384             :      */
     385          12 :     if (node->ms_prune_state &&
     386           0 :         bms_overlap(node->ps.chgParam,
     387           0 :                     node->ms_prune_state->execparamids))
     388             :     {
     389           0 :         bms_free(node->ms_valid_subplans);
     390           0 :         node->ms_valid_subplans = NULL;
     391             :     }
     392             : 
     393          36 :     for (i = 0; i < node->ms_nplans; i++)
     394             :     {
     395          24 :         PlanState  *subnode = node->mergeplans[i];
     396             : 
     397             :         /*
     398             :          * ExecReScan doesn't know about my subplans, so I have to do
     399             :          * changed-parameter signaling myself.
     400             :          */
     401          24 :         if (node->ps.chgParam != NULL)
     402          24 :             UpdateChangedParamSet(subnode, node->ps.chgParam);
     403             : 
     404             :         /*
     405             :          * If chgParam of subnode is not null then plan will be re-scanned by
     406             :          * first ExecProcNode.
     407             :          */
     408          24 :         if (subnode->chgParam == NULL)
     409           0 :             ExecReScan(subnode);
     410             :     }
     411          12 :     binaryheap_reset(node->ms_heap);
     412          12 :     node->ms_initialized = false;
     413          12 : }

Generated by: LCOV version 1.13