LCOV - code coverage report
Current view: top level - src/backend/executor - nodeAgg.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 1415 1477 95.8 %
Date: 2020-06-05 19:06:29 Functions: 57 59 96.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * nodeAgg.c
       4             :  *    Routines to handle aggregate nodes.
       5             :  *
       6             :  *    ExecAgg normally evaluates each aggregate in the following steps:
       7             :  *
       8             :  *       transvalue = initcond
       9             :  *       foreach input_tuple do
      10             :  *          transvalue = transfunc(transvalue, input_value(s))
      11             :  *       result = finalfunc(transvalue, direct_argument(s))
      12             :  *
      13             :  *    If a finalfunc is not supplied then the result is just the ending
      14             :  *    value of transvalue.
      15             :  *
      16             :  *    Other behaviors can be selected by the "aggsplit" mode, which exists
      17             :  *    to support partial aggregation.  It is possible to:
      18             :  *    * Skip running the finalfunc, so that the output is always the
      19             :  *    final transvalue state.
      20             :  *    * Substitute the combinefunc for the transfunc, so that transvalue
      21             :  *    states (propagated up from a child partial-aggregation step) are merged
      22             :  *    rather than processing raw input rows.  (The statements below about
      23             :  *    the transfunc apply equally to the combinefunc, when it's selected.)
      24             :  *    * Apply the serializefunc to the output values (this only makes sense
      25             :  *    when skipping the finalfunc, since the serializefunc works on the
      26             :  *    transvalue data type).
      27             :  *    * Apply the deserializefunc to the input values (this only makes sense
      28             :  *    when using the combinefunc, for similar reasons).
      29             :  *    It is the planner's responsibility to connect up Agg nodes using these
      30             :  *    alternate behaviors in a way that makes sense, with partial aggregation
      31             :  *    results being fed to nodes that expect them.
      32             :  *
      33             :  *    If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
      34             :  *    input tuples and eliminate duplicates (if required) before performing
      35             :  *    the above-depicted process.  (However, we don't do that for ordered-set
      36             :  *    aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
      37             :  *    so far as this module is concerned.)  Note that partial aggregation
      38             :  *    is not supported in these cases, since we couldn't ensure global
      39             :  *    ordering or distinctness of the inputs.
      40             :  *
      41             :  *    If transfunc is marked "strict" in pg_proc and initcond is NULL,
      42             :  *    then the first non-NULL input_value is assigned directly to transvalue,
      43             :  *    and transfunc isn't applied until the second non-NULL input_value.
      44             :  *    The agg's first input type and transtype must be the same in this case!
      45             :  *
      46             :  *    If transfunc is marked "strict" then NULL input_values are skipped,
      47             :  *    keeping the previous transvalue.  If transfunc is not strict then it
      48             :  *    is called for every input tuple and must deal with NULL initcond
      49             :  *    or NULL input_values for itself.
      50             :  *
      51             :  *    If finalfunc is marked "strict" then it is not called when the
      52             :  *    ending transvalue is NULL, instead a NULL result is created
      53             :  *    automatically (this is just the usual handling of strict functions,
      54             :  *    of course).  A non-strict finalfunc can make its own choice of
      55             :  *    what to return for a NULL ending transvalue.
      56             :  *
      57             :  *    Ordered-set aggregates are treated specially in one other way: we
      58             :  *    evaluate any "direct" arguments and pass them to the finalfunc along
      59             :  *    with the transition value.
      60             :  *
      61             :  *    A finalfunc can have additional arguments beyond the transvalue and
      62             :  *    any "direct" arguments, corresponding to the input arguments of the
      63             :  *    aggregate.  These are always just passed as NULL.  Such arguments may be
      64             :  *    needed to allow resolution of a polymorphic aggregate's result type.
      65             :  *
      66             :  *    We compute aggregate input expressions and run the transition functions
      67             :  *    in a temporary econtext (aggstate->tmpcontext).  This is reset at least
      68             :  *    once per input tuple, so when the transvalue datatype is
      69             :  *    pass-by-reference, we have to be careful to copy it into a longer-lived
      70             :  *    memory context, and free the prior value to avoid memory leakage.  We
      71             :  *    store transvalues in another set of econtexts, aggstate->aggcontexts
      72             :  *    (one per grouping set, see below), which are also used for the hashtable
      73             :  *    structures in AGG_HASHED mode.  These econtexts are rescanned, not just
      74             :  *    reset, at group boundaries so that aggregate transition functions can
      75             :  *    register shutdown callbacks via AggRegisterCallback.
      76             :  *
      77             :  *    The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
      78             :  *    run finalize functions and compute the output tuple; this context can be
      79             :  *    reset once per output tuple.
      80             :  *
      81             :  *    The executor's AggState node is passed as the fmgr "context" value in
      82             :  *    all transfunc and finalfunc calls.  It is not recommended that the
      83             :  *    transition functions look at the AggState node directly, but they can
      84             :  *    use AggCheckCallContext() to verify that they are being called by
      85             :  *    nodeAgg.c (and not as ordinary SQL functions).  The main reason a
      86             :  *    transition function might want to know this is so that it can avoid
      87             :  *    palloc'ing a fixed-size pass-by-ref transition value on every call:
      88             :  *    it can instead just scribble on and return its left input.  Ordinarily
      89             :  *    it is completely forbidden for functions to modify pass-by-ref inputs,
      90             :  *    but in the aggregate case we know the left input is either the initial
      91             :  *    transition value or a previous function result, and in either case its
      92             :  *    value need not be preserved.  See int8inc() for an example.  Notice that
      93             :  *    the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
      94             :  *    the previous transition value pointer is returned.  It is also possible
      95             :  *    to avoid repeated data copying when the transition value is an expanded
      96             :  *    object: to do that, the transition function must take care to return
      97             :  *    an expanded object that is in a child context of the memory context
      98             :  *    returned by AggCheckCallContext().  Also, some transition functions want
      99             :  *    to store working state in addition to the nominal transition value; they
     100             :  *    can use the memory context returned by AggCheckCallContext() to do that.
     101             :  *
     102             :  *    Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
     103             :  *    AggState is available as context in earlier releases (back to 8.1),
     104             :  *    but direct examination of the node is needed to use it before 9.0.
     105             :  *
     106             :  *    As of 9.4, aggregate transition functions can also use AggGetAggref()
     107             :  *    to get hold of the Aggref expression node for their aggregate call.
     108             :  *    This is mainly intended for ordered-set aggregates, which are not
     109             :  *    supported as window functions.  (A regular aggregate function would
     110             :  *    need some fallback logic to use this, since there's no Aggref node
     111             :  *    for a window function.)
     112             :  *
     113             :  *    Grouping sets:
     114             :  *
     115             :  *    A list of grouping sets which is structurally equivalent to a ROLLUP
     116             :  *    clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
     117             :  *    ordered data.  We do this by keeping a separate set of transition values
     118             :  *    for each grouping set being concurrently processed; for each input tuple
     119             :  *    we update them all, and on group boundaries we reset those states
     120             :  *    (starting at the front of the list) whose grouping values have changed
     121             :  *    (the list of grouping sets is ordered from most specific to least
     122             :  *    specific).
     123             :  *
     124             :  *    Where more complex grouping sets are used, we break them down into
     125             :  *    "phases", where each phase has a different sort order (except phase 0
     126             :  *    which is reserved for hashing).  During each phase but the last, the
     127             :  *    input tuples are additionally stored in a tuplesort which is keyed to the
     128             :  *    next phase's sort order; during each phase but the first, the input
     129             :  *    tuples are drawn from the previously sorted data.  (The sorting of the
     130             :  *    data for the first phase is handled by the planner, as it might be
     131             :  *    satisfied by underlying nodes.)
     132             :  *
     133             :  *    Hashing can be mixed with sorted grouping.  To do this, we have an
     134             :  *    AGG_MIXED strategy that populates the hashtables during the first sorted
     135             :  *    phase, and switches to reading them out after completing all sort phases.
     136             :  *    We can also support AGG_HASHED with multiple hash tables and no sorting
     137             :  *    at all.
     138             :  *
     139             :  *    From the perspective of aggregate transition and final functions, the
     140             :  *    only issue regarding grouping sets is this: a single call site (flinfo)
     141             :  *    of an aggregate function may be used for updating several different
     142             :  *    transition values in turn. So the function must not cache in the flinfo
     143             :  *    anything which logically belongs as part of the transition value (most
     144             :  *    importantly, the memory context in which the transition value exists).
     145             :  *    The support API functions (AggCheckCallContext, AggRegisterCallback) are
     146             :  *    sensitive to the grouping set for which the aggregate function is
     147             :  *    currently being called.
     148             :  *
     149             :  *    Plan structure:
     150             :  *
     151             :  *    What we get from the planner is actually one "real" Agg node which is
     152             :  *    part of the plan tree proper, but which optionally has an additional list
     153             :  *    of Agg nodes hung off the side via the "chain" field.  This is because an
     154             :  *    Agg node happens to be a convenient representation of all the data we
     155             :  *    need for grouping sets.
     156             :  *
     157             :  *    For many purposes, we treat the "real" node as if it were just the first
     158             :  *    node in the chain.  The chain must be ordered such that hashed entries
     159             :  *    come before sorted/plain entries; the real node is marked AGG_MIXED if
     160             :  *    there are both types present (in which case the real node describes one
     161             :  *    of the hashed groupings, other AGG_HASHED nodes may optionally follow in
     162             :  *    the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
     163             :  *    the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
     164             :  *    nodes must be of the same type; if it is AGG_PLAIN, there can be no
     165             :  *    chained nodes.
     166             :  *
     167             :  *    We collect all hashed nodes into a single "phase", numbered 0, and create
     168             :  *    a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
     169             :  *    Phase 0 is allocated even if there are no hashes, but remains unused in
     170             :  *    that case.
     171             :  *
     172             :  *    AGG_HASHED nodes actually refer to only a single grouping set each,
     173             :  *    because for each hashed grouping we need a separate grpColIdx and
     174             :  *    numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
     175             :  *    grouping sets that share a sort order.  Each AGG_SORTED node other than
     176             :  *    the first one has an associated Sort node which describes the sort order
     177             :  *    to be used; the first sorted node takes its input from the outer subtree,
     178             :  *    which the planner has already arranged to provide ordered data.
     179             :  *
     180             :  *    Memory and ExprContext usage:
     181             :  *
     182             :  *    Because we're accumulating aggregate values across input rows, we need to
     183             :  *    use more memory contexts than just simple input/output tuple contexts.
     184             :  *    In fact, for a rollup, we need a separate context for each grouping set
     185             :  *    so that we can reset the inner (finer-grained) aggregates on their group
     186             :  *    boundaries while continuing to accumulate values for outer
     187             :  *    (coarser-grained) groupings.  On top of this, we might be simultaneously
     188             :  *    populating hashtables; however, we only need one context for all the
     189             :  *    hashtables.
     190             :  *
     191             :  *    So we create an array, aggcontexts, with an ExprContext for each grouping
     192             :  *    set in the largest rollup that we're going to process, and use the
     193             :  *    per-tuple memory context of those ExprContexts to store the aggregate
     194             :  *    transition values.  hashcontext is the single context created to support
     195             :  *    all hash tables.
     196             :  *
     197             :  *    Spilling To Disk
     198             :  *
     199             :  *    When performing hash aggregation, if the hash table memory exceeds the
     200             :  *    limit (see hash_agg_check_limits()), we enter "spill mode". In spill
     201             :  *    mode, we advance the transition states only for groups already in the
     202             :  *    hash table. For tuples that would need to create a new hash table
     203             :  *    entries (and initialize new transition states), we instead spill them to
     204             :  *    disk to be processed later. The tuples are spilled in a partitioned
     205             :  *    manner, so that subsequent batches are smaller and less likely to exceed
     206             :  *    work_mem (if a batch does exceed work_mem, it must be spilled
     207             :  *    recursively).
     208             :  *
     209             :  *    Spilled data is written to logical tapes. These provide better control
     210             :  *    over memory usage, disk space, and the number of files than if we were
     211             :  *    to use a BufFile for each spill.
     212             :  *
     213             :  *    Note that it's possible for transition states to start small but then
     214             :  *    grow very large; for instance in the case of ARRAY_AGG. In such cases,
     215             :  *    it's still possible to significantly exceed work_mem. We try to avoid
     216             :  *    this situation by estimating what will fit in the available memory, and
     217             :  *    imposing a limit on the number of groups separately from the amount of
     218             :  *    memory consumed.
     219             :  *
     220             :  *    Transition / Combine function invocation:
     221             :  *
     222             :  *    For performance reasons transition functions, including combine
     223             :  *    functions, aren't invoked one-by-one from nodeAgg.c after computing
     224             :  *    arguments using the expression evaluation engine. Instead
     225             :  *    ExecBuildAggTrans() builds one large expression that does both argument
     226             :  *    evaluation and transition function invocation. That avoids performance
     227             :  *    issues due to repeated uses of expression evaluation, complications due
     228             :  *    to filter expressions having to be evaluated early, and allows to JIT
     229             :  *    the entire expression into one native function.
     230             :  *
     231             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
     232             :  * Portions Copyright (c) 1994, Regents of the University of California
     233             :  *
     234             :  * IDENTIFICATION
     235             :  *    src/backend/executor/nodeAgg.c
     236             :  *
     237             :  *-------------------------------------------------------------------------
     238             :  */
     239             : 
     240             : #include "postgres.h"
     241             : 
     242             : #include "access/htup_details.h"
     243             : #include "catalog/objectaccess.h"
     244             : #include "catalog/pg_aggregate.h"
     245             : #include "catalog/pg_proc.h"
     246             : #include "catalog/pg_type.h"
     247             : #include "executor/execExpr.h"
     248             : #include "executor/executor.h"
     249             : #include "executor/nodeAgg.h"
     250             : #include "miscadmin.h"
     251             : #include "nodes/makefuncs.h"
     252             : #include "nodes/nodeFuncs.h"
     253             : #include "optimizer/optimizer.h"
     254             : #include "parser/parse_agg.h"
     255             : #include "parser/parse_coerce.h"
     256             : #include "utils/acl.h"
     257             : #include "utils/builtins.h"
     258             : #include "utils/datum.h"
     259             : #include "utils/dynahash.h"
     260             : #include "utils/expandeddatum.h"
     261             : #include "utils/logtape.h"
     262             : #include "utils/lsyscache.h"
     263             : #include "utils/memutils.h"
     264             : #include "utils/syscache.h"
     265             : #include "utils/tuplesort.h"
     266             : 
     267             : /*
     268             :  * Control how many partitions are created when spilling HashAgg to
     269             :  * disk.
     270             :  *
     271             :  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
     272             :  * partitions needed such that each partition will fit in memory. The factor
     273             :  * is set higher than one because there's not a high cost to having a few too
     274             :  * many partitions, and it makes it less likely that a partition will need to
     275             :  * be spilled recursively. Another benefit of having more, smaller partitions
     276             :  * is that small hash tables may perform better than large ones due to memory
     277             :  * caching effects.
     278             :  *
     279             :  * We also specify a min and max number of partitions per spill. Too few might
     280             :  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
     281             :  * many will result in lots of memory wasted buffering the spill files (which
     282             :  * could instead be spent on a larger hash table).
     283             :  */
     284             : #define HASHAGG_PARTITION_FACTOR 1.50
     285             : #define HASHAGG_MIN_PARTITIONS 4
     286             : #define HASHAGG_MAX_PARTITIONS 1024
     287             : 
     288             : /*
     289             :  * For reading from tapes, the buffer size must be a multiple of
     290             :  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
     291             :  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
     292             :  * tape always uses a buffer of size BLCKSZ.
     293             :  */
     294             : #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
     295             : #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
     296             : 
     297             : /* minimum number of initial hash table buckets */
     298             : #define HASHAGG_MIN_BUCKETS 256
     299             : 
     300             : /*
     301             :  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
     302             :  * improved?
     303             :  */
     304             : #define CHUNKHDRSZ 16
     305             : 
     306             : /*
     307             :  * Track all tapes needed for a HashAgg that spills. We don't know the maximum
     308             :  * number of tapes needed at the start of the algorithm (because it can
     309             :  * recurse), so one tape set is allocated and extended as needed for new
     310             :  * tapes. When a particular tape is already read, rewind it for write mode and
     311             :  * put it in the free list.
     312             :  *
     313             :  * Tapes' buffers can take up substantial memory when many tapes are open at
     314             :  * once. We only need one tape open at a time in read mode (using a buffer
     315             :  * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
     316             :  * requiring a buffer of size BLCKSZ) for each partition.
     317             :  */
     318             : typedef struct HashTapeInfo
     319             : {
     320             :     LogicalTapeSet *tapeset;
     321             :     int         ntapes;
     322             :     int        *freetapes;
     323             :     int         nfreetapes;
     324             :     int         freetapes_alloc;
     325             : } HashTapeInfo;
     326             : 
     327             : /*
     328             :  * Represents partitioned spill data for a single hashtable. Contains the
     329             :  * necessary information to route tuples to the correct partition, and to
     330             :  * transform the spilled data into new batches.
     331             :  *
     332             :  * The high bits are used for partition selection (when recursing, we ignore
     333             :  * the bits that have already been used for partition selection at an earlier
     334             :  * level).
     335             :  */
     336             : typedef struct HashAggSpill
     337             : {
     338             :     LogicalTapeSet *tapeset;    /* borrowed reference to tape set */
     339             :     int         npartitions;    /* number of partitions */
     340             :     int        *partitions;     /* spill partition tape numbers */
     341             :     int64      *ntuples;        /* number of tuples in each partition */
     342             :     uint32      mask;           /* mask to find partition from hash value */
     343             :     int         shift;          /* after masking, shift by this amount */
     344             : } HashAggSpill;
     345             : 
     346             : /*
     347             :  * Represents work to be done for one pass of hash aggregation (with only one
     348             :  * grouping set).
     349             :  *
     350             :  * Also tracks the bits of the hash already used for partition selection by
     351             :  * earlier iterations, so that this batch can use new bits. If all bits have
     352             :  * already been used, no partitioning will be done (any spilled data will go
     353             :  * to a single output tape).
     354             :  */
     355             : typedef struct HashAggBatch
     356             : {
     357             :     int         setno;          /* grouping set */
     358             :     int         used_bits;      /* number of bits of hash already used */
     359             :     LogicalTapeSet *tapeset;    /* borrowed reference to tape set */
     360             :     int         input_tapenum;  /* input partition tape */
     361             :     int64       input_tuples;   /* number of tuples in this batch */
     362             : } HashAggBatch;
     363             : 
     364             : static void select_current_set(AggState *aggstate, int setno, bool is_hash);
     365             : static void initialize_phase(AggState *aggstate, int newphase);
     366             : static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
     367             : static void initialize_aggregates(AggState *aggstate,
     368             :                                   AggStatePerGroup *pergroups,
     369             :                                   int numReset);
     370             : static void advance_transition_function(AggState *aggstate,
     371             :                                         AggStatePerTrans pertrans,
     372             :                                         AggStatePerGroup pergroupstate);
     373             : static void advance_aggregates(AggState *aggstate);
     374             : static void process_ordered_aggregate_single(AggState *aggstate,
     375             :                                              AggStatePerTrans pertrans,
     376             :                                              AggStatePerGroup pergroupstate);
     377             : static void process_ordered_aggregate_multi(AggState *aggstate,
     378             :                                             AggStatePerTrans pertrans,
     379             :                                             AggStatePerGroup pergroupstate);
     380             : static void finalize_aggregate(AggState *aggstate,
     381             :                                AggStatePerAgg peragg,
     382             :                                AggStatePerGroup pergroupstate,
     383             :                                Datum *resultVal, bool *resultIsNull);
     384             : static void finalize_partialaggregate(AggState *aggstate,
     385             :                                       AggStatePerAgg peragg,
     386             :                                       AggStatePerGroup pergroupstate,
     387             :                                       Datum *resultVal, bool *resultIsNull);
     388             : static void prepare_hash_slot(AggState *aggstate);
     389             : static void prepare_projection_slot(AggState *aggstate,
     390             :                                     TupleTableSlot *slot,
     391             :                                     int currentSet);
     392             : static void finalize_aggregates(AggState *aggstate,
     393             :                                 AggStatePerAgg peragg,
     394             :                                 AggStatePerGroup pergroup);
     395             : static TupleTableSlot *project_aggregates(AggState *aggstate);
     396             : static Bitmapset *find_unaggregated_cols(AggState *aggstate);
     397             : static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
     398             : static void build_hash_tables(AggState *aggstate);
     399             : static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
     400             : static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
     401             :                                           bool nullcheck);
     402             : static long hash_choose_num_buckets(double hashentrysize,
     403             :                                     long estimated_nbuckets,
     404             :                                     Size memory);
     405             : static int  hash_choose_num_partitions(uint64 input_groups,
     406             :                                        double hashentrysize,
     407             :                                        int used_bits,
     408             :                                        int *log2_npartittions);
     409             : static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash,
     410             :                                           bool *in_hash_table);
     411             : static void lookup_hash_entries(AggState *aggstate);
     412             : static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
     413             : static void agg_fill_hash_table(AggState *aggstate);
     414             : static bool agg_refill_hash_table(AggState *aggstate);
     415             : static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
     416             : static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
     417             : static void hash_agg_check_limits(AggState *aggstate);
     418             : static void hash_agg_enter_spill_mode(AggState *aggstate);
     419             : static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
     420             :                                     int npartitions);
     421             : static void hashagg_finish_initial_spills(AggState *aggstate);
     422             : static void hashagg_reset_spill_state(AggState *aggstate);
     423             : static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
     424             :                                        int input_tapenum, int setno,
     425             :                                        int64 input_tuples, int used_bits);
     426             : static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
     427             : static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
     428             :                                int used_bits, uint64 input_tuples,
     429             :                                double hashentrysize);
     430             : static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot,
     431             :                                 uint32 hash);
     432             : static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
     433             :                                  int setno);
     434             : static void hashagg_tapeinfo_init(AggState *aggstate);
     435             : static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
     436             :                                     int ndest);
     437             : static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
     438             : static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
     439             : static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
     440             :                                       AggState *aggstate, EState *estate,
     441             :                                       Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
     442             :                                       Oid aggserialfn, Oid aggdeserialfn,
     443             :                                       Datum initValue, bool initValueIsNull,
     444             :                                       Oid *inputTypes, int numArguments);
     445             : static int  find_compatible_peragg(Aggref *newagg, AggState *aggstate,
     446             :                                    int lastaggno, List **same_input_transnos);
     447             : static int  find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
     448             :                                      bool shareable,
     449             :                                      Oid aggtransfn, Oid aggtranstype,
     450             :                                      Oid aggserialfn, Oid aggdeserialfn,
     451             :                                      Datum initValue, bool initValueIsNull,
     452             :                                      List *transnos);
     453             : 
     454             : 
     455             : /*
     456             :  * Select the current grouping set; affects current_set and
     457             :  * curaggcontext.
     458             :  */
     459             : static void
     460     6034812 : select_current_set(AggState *aggstate, int setno, bool is_hash)
     461             : {
     462             :     /*
     463             :      * When changing this, also adapt ExecAggPlainTransByVal() and
     464             :      * ExecAggPlainTransByRef().
     465             :      */
     466     6034812 :     if (is_hash)
     467     3413912 :         aggstate->curaggcontext = aggstate->hashcontext;
     468             :     else
     469     2620900 :         aggstate->curaggcontext = aggstate->aggcontexts[setno];
     470             : 
     471     6034812 :     aggstate->current_set = setno;
     472     6034812 : }
     473             : 
     474             : /*
     475             :  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
     476             :  * current_phase + 1. Juggle the tuplesorts accordingly.
     477             :  *
     478             :  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
     479             :  * case, so when entering phase 0, all we need to do is drop open sorts.
     480             :  */
     481             : static void
     482     1170906 : initialize_phase(AggState *aggstate, int newphase)
     483             : {
     484             :     Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
     485             : 
     486             :     /*
     487             :      * Whatever the previous state, we're now done with whatever input
     488             :      * tuplesort was in use.
     489             :      */
     490     1170906 :     if (aggstate->sort_in)
     491             :     {
     492          28 :         tuplesort_end(aggstate->sort_in);
     493          28 :         aggstate->sort_in = NULL;
     494             :     }
     495             : 
     496     1170906 :     if (newphase <= 1)
     497             :     {
     498             :         /*
     499             :          * Discard any existing output tuplesort.
     500             :          */
     501     1170786 :         if (aggstate->sort_out)
     502             :         {
     503           4 :             tuplesort_end(aggstate->sort_out);
     504           4 :             aggstate->sort_out = NULL;
     505             :         }
     506             :     }
     507             :     else
     508             :     {
     509             :         /*
     510             :          * The old output tuplesort becomes the new input one, and this is the
     511             :          * right time to actually sort it.
     512             :          */
     513         120 :         aggstate->sort_in = aggstate->sort_out;
     514         120 :         aggstate->sort_out = NULL;
     515             :         Assert(aggstate->sort_in);
     516         120 :         tuplesort_performsort(aggstate->sort_in);
     517             :     }
     518             : 
     519             :     /*
     520             :      * If this isn't the last phase, we need to sort appropriately for the
     521             :      * next phase in sequence.
     522             :      */
     523     1170906 :     if (newphase > 0 && newphase < aggstate->numphases - 1)
     524             :     {
     525         148 :         Sort       *sortnode = aggstate->phases[newphase + 1].sortnode;
     526         148 :         PlanState  *outerNode = outerPlanState(aggstate);
     527         148 :         TupleDesc   tupDesc = ExecGetResultType(outerNode);
     528             : 
     529         148 :         aggstate->sort_out = tuplesort_begin_heap(tupDesc,
     530             :                                                   sortnode->numCols,
     531             :                                                   sortnode->sortColIdx,
     532             :                                                   sortnode->sortOperators,
     533             :                                                   sortnode->collations,
     534             :                                                   sortnode->nullsFirst,
     535             :                                                   work_mem,
     536             :                                                   NULL, false);
     537             :     }
     538             : 
     539     1170906 :     aggstate->current_phase = newphase;
     540     1170906 :     aggstate->phase = &aggstate->phases[newphase];
     541     1170906 : }
     542             : 
     543             : /*
     544             :  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
     545             :  * populated by the previous phase.  Copy it to the sorter for the next phase
     546             :  * if any.
     547             :  *
     548             :  * Callers cannot rely on memory for tuple in returned slot remaining valid
     549             :  * past any subsequently fetched tuple.
     550             :  */
     551             : static TupleTableSlot *
     552    15656792 : fetch_input_tuple(AggState *aggstate)
     553             : {
     554             :     TupleTableSlot *slot;
     555             : 
     556    15656792 :     if (aggstate->sort_in)
     557             :     {
     558             :         /* make sure we check for interrupts in either path through here */
     559      116564 :         CHECK_FOR_INTERRUPTS();
     560      116564 :         if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
     561             :                                     aggstate->sort_slot, NULL))
     562         120 :             return NULL;
     563      116444 :         slot = aggstate->sort_slot;
     564             :     }
     565             :     else
     566    15540228 :         slot = ExecProcNode(outerPlanState(aggstate));
     567             : 
     568    15656658 :     if (!TupIsNull(slot) && aggstate->sort_out)
     569      116444 :         tuplesort_puttupleslot(aggstate->sort_out, slot);
     570             : 
     571    15656658 :     return slot;
     572             : }
     573             : 
     574             : /*
     575             :  * (Re)Initialize an individual aggregate.
     576             :  *
     577             :  * This function handles only one grouping set, already set in
     578             :  * aggstate->current_set.
     579             :  *
     580             :  * When called, CurrentMemoryContext should be the per-query context.
     581             :  */
     582             : static void
     583     1844854 : initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
     584             :                      AggStatePerGroup pergroupstate)
     585             : {
     586             :     /*
     587             :      * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
     588             :      */
     589     1844854 :     if (pertrans->numSortCols > 0)
     590             :     {
     591             :         /*
     592             :          * In case of rescan, maybe there could be an uncompleted sort
     593             :          * operation?  Clean it up if so.
     594             :          */
     595     1144594 :         if (pertrans->sortstates[aggstate->current_set])
     596           0 :             tuplesort_end(pertrans->sortstates[aggstate->current_set]);
     597             : 
     598             : 
     599             :         /*
     600             :          * We use a plain Datum sorter when there's a single input column;
     601             :          * otherwise sort the full tuple.  (See comments for
     602             :          * process_ordered_aggregate_single.)
     603             :          */
     604     1144594 :         if (pertrans->numInputs == 1)
     605             :         {
     606       49568 :             Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
     607             : 
     608       49568 :             pertrans->sortstates[aggstate->current_set] =
     609      247840 :                 tuplesort_begin_datum(attr->atttypid,
     610       49568 :                                       pertrans->sortOperators[0],
     611       49568 :                                       pertrans->sortCollations[0],
     612       49568 :                                       pertrans->sortNullsFirst[0],
     613             :                                       work_mem, NULL, false);
     614             :         }
     615             :         else
     616     1095026 :             pertrans->sortstates[aggstate->current_set] =
     617     1095026 :                 tuplesort_begin_heap(pertrans->sortdesc,
     618             :                                      pertrans->numSortCols,
     619             :                                      pertrans->sortColIdx,
     620             :                                      pertrans->sortOperators,
     621             :                                      pertrans->sortCollations,
     622             :                                      pertrans->sortNullsFirst,
     623             :                                      work_mem, NULL, false);
     624             :     }
     625             : 
     626             :     /*
     627             :      * (Re)set transValue to the initial value.
     628             :      *
     629             :      * Note that when the initial value is pass-by-ref, we must copy it (into
     630             :      * the aggcontext) since we will pfree the transValue later.
     631             :      */
     632     1844854 :     if (pertrans->initValueIsNull)
     633     1465162 :         pergroupstate->transValue = pertrans->initValue;
     634             :     else
     635             :     {
     636             :         MemoryContext oldContext;
     637             : 
     638      379692 :         oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
     639     1139076 :         pergroupstate->transValue = datumCopy(pertrans->initValue,
     640      379692 :                                               pertrans->transtypeByVal,
     641      379692 :                                               pertrans->transtypeLen);
     642      379692 :         MemoryContextSwitchTo(oldContext);
     643             :     }
     644     1844854 :     pergroupstate->transValueIsNull = pertrans->initValueIsNull;
     645             : 
     646             :     /*
     647             :      * If the initial value for the transition state doesn't exist in the
     648             :      * pg_aggregate table then we will let the first non-NULL value returned
     649             :      * from the outer procNode become the initial value. (This is useful for
     650             :      * aggregates like max() and min().) The noTransValue flag signals that we
     651             :      * still need to do this.
     652             :      */
     653     1844854 :     pergroupstate->noTransValue = pertrans->initValueIsNull;
     654     1844854 : }
     655             : 
     656             : /*
     657             :  * Initialize all aggregate transition states for a new group of input values.
     658             :  *
     659             :  * If there are multiple grouping sets, we initialize only the first numReset
     660             :  * of them (the grouping sets are ordered so that the most specific one, which
     661             :  * is reset most often, is first). As a convenience, if numReset is 0, we
     662             :  * reinitialize all sets.
     663             :  *
     664             :  * NB: This cannot be used for hash aggregates, as for those the grouping set
     665             :  * number has to be specified from further up.
     666             :  *
     667             :  * When called, CurrentMemoryContext should be the per-query context.
     668             :  */
     669             : static void
     670     1287602 : initialize_aggregates(AggState *aggstate,
     671             :                       AggStatePerGroup *pergroups,
     672             :                       int numReset)
     673             : {
     674             :     int         transno;
     675     1287602 :     int         numGroupingSets = Max(aggstate->phase->numsets, 1);
     676     1287602 :     int         setno = 0;
     677     1287602 :     int         numTrans = aggstate->numtrans;
     678     1287602 :     AggStatePerTrans transstates = aggstate->pertrans;
     679             : 
     680     1287602 :     if (numReset == 0)
     681           0 :         numReset = numGroupingSets;
     682             : 
     683     2584428 :     for (setno = 0; setno < numReset; setno++)
     684             :     {
     685     1296826 :         AggStatePerGroup pergroup = pergroups[setno];
     686             : 
     687     1296826 :         select_current_set(aggstate, setno, false);
     688             : 
     689     2728736 :         for (transno = 0; transno < numTrans; transno++)
     690             :         {
     691     1431910 :             AggStatePerTrans pertrans = &transstates[transno];
     692     1431910 :             AggStatePerGroup pergroupstate = &pergroup[transno];
     693             : 
     694     1431910 :             initialize_aggregate(aggstate, pertrans, pergroupstate);
     695             :         }
     696             :     }
     697     1287602 : }
     698             : 
     699             : /*
     700             :  * Given new input value(s), advance the transition function of one aggregate
     701             :  * state within one grouping set only (already set in aggstate->current_set)
     702             :  *
     703             :  * The new values (and null flags) have been preloaded into argument positions
     704             :  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
     705             :  * pass to the transition function.  We also expect that the static fields of
     706             :  * the fcinfo are already initialized; that was done by ExecInitAgg().
     707             :  *
     708             :  * It doesn't matter which memory context this is called in.
     709             :  */
     710             : static void
     711      582758 : advance_transition_function(AggState *aggstate,
     712             :                             AggStatePerTrans pertrans,
     713             :                             AggStatePerGroup pergroupstate)
     714             : {
     715      582758 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     716             :     MemoryContext oldContext;
     717             :     Datum       newVal;
     718             : 
     719      582758 :     if (pertrans->transfn.fn_strict)
     720             :     {
     721             :         /*
     722             :          * For a strict transfn, nothing happens when there's a NULL input; we
     723             :          * just keep the prior transValue.
     724             :          */
     725       86114 :         int         numTransInputs = pertrans->numTransInputs;
     726             :         int         i;
     727             : 
     728      172276 :         for (i = 1; i <= numTransInputs; i++)
     729             :         {
     730       86162 :             if (fcinfo->args[i].isnull)
     731           0 :                 return;
     732             :         }
     733       86114 :         if (pergroupstate->noTransValue)
     734             :         {
     735             :             /*
     736             :              * transValue has not been initialized. This is the first non-NULL
     737             :              * input value. We use it as the initial value for transValue. (We
     738             :              * already checked that the agg's input type is binary-compatible
     739             :              * with its transtype, so straight copy here is OK.)
     740             :              *
     741             :              * We must copy the datum into aggcontext if it is pass-by-ref. We
     742             :              * do not need to pfree the old transValue, since it's NULL.
     743             :              */
     744           8 :             oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
     745          24 :             pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
     746           8 :                                                   pertrans->transtypeByVal,
     747           8 :                                                   pertrans->transtypeLen);
     748           8 :             pergroupstate->transValueIsNull = false;
     749           8 :             pergroupstate->noTransValue = false;
     750           8 :             MemoryContextSwitchTo(oldContext);
     751           8 :             return;
     752             :         }
     753       86106 :         if (pergroupstate->transValueIsNull)
     754             :         {
     755             :             /*
     756             :              * Don't call a strict function with NULL inputs.  Note it is
     757             :              * possible to get here despite the above tests, if the transfn is
     758             :              * strict *and* returned a NULL on a prior cycle. If that happens
     759             :              * we will propagate the NULL all the way to the end.
     760             :              */
     761           0 :             return;
     762             :         }
     763             :     }
     764             : 
     765             :     /* We run the transition functions in per-input-tuple memory context */
     766      582750 :     oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
     767             : 
     768             :     /* set up aggstate->curpertrans for AggGetAggref() */
     769      582750 :     aggstate->curpertrans = pertrans;
     770             : 
     771             :     /*
     772             :      * OK to call the transition function
     773             :      */
     774      582750 :     fcinfo->args[0].value = pergroupstate->transValue;
     775      582750 :     fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
     776      582750 :     fcinfo->isnull = false;      /* just in case transfn doesn't set it */
     777             : 
     778      582750 :     newVal = FunctionCallInvoke(fcinfo);
     779             : 
     780      582750 :     aggstate->curpertrans = NULL;
     781             : 
     782             :     /*
     783             :      * If pass-by-ref datatype, must copy the new value into aggcontext and
     784             :      * free the prior transValue.  But if transfn returned a pointer to its
     785             :      * first input, we don't need to do anything.  Also, if transfn returned a
     786             :      * pointer to a R/W expanded object that is already a child of the
     787             :      * aggcontext, assume we can adopt that value without copying it.
     788             :      *
     789             :      * It's safe to compare newVal with pergroup->transValue without regard
     790             :      * for either being NULL, because ExecAggTransReparent() takes care to set
     791             :      * transValue to 0 when NULL. Otherwise we could end up accidentally not
     792             :      * reparenting, when the transValue has the same numerical value as
     793             :      * newValue, despite being NULL.  This is a somewhat hot path, making it
     794             :      * undesirable to instead solve this with another branch for the common
     795             :      * case of the transition function returning its (modified) input
     796             :      * argument.
     797             :      */
     798      582750 :     if (!pertrans->transtypeByVal &&
     799         240 :         DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
     800         480 :         newVal = ExecAggTransReparent(aggstate, pertrans,
     801         240 :                                       newVal, fcinfo->isnull,
     802             :                                       pergroupstate->transValue,
     803         240 :                                       pergroupstate->transValueIsNull);
     804             : 
     805      582750 :     pergroupstate->transValue = newVal;
     806      582750 :     pergroupstate->transValueIsNull = fcinfo->isnull;
     807             : 
     808      582750 :     MemoryContextSwitchTo(oldContext);
     809             : }
     810             : 
     811             : /*
     812             :  * Advance each aggregate transition state for one input tuple.  The input
     813             :  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
     814             :  * accessible to ExecEvalExpr.
     815             :  *
     816             :  * We have two sets of transition states to handle: one for sorted aggregation
     817             :  * and one for hashed; we do them both here, to avoid multiple evaluation of
     818             :  * the inputs.
     819             :  *
     820             :  * When called, CurrentMemoryContext should be the per-query context.
     821             :  */
     822             : static void
     823    14692212 : advance_aggregates(AggState *aggstate)
     824             : {
     825             :     bool        dummynull;
     826             : 
     827    14692212 :     ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
     828             :                               aggstate->tmpcontext,
     829             :                               &dummynull);
     830    14692184 : }
     831             : 
     832             : /*
     833             :  * Run the transition function for a DISTINCT or ORDER BY aggregate
     834             :  * with only one input.  This is called after we have completed
     835             :  * entering all the input values into the sort object.  We complete the
     836             :  * sort, read out the values in sorted order, and run the transition
     837             :  * function on each value (applying DISTINCT if appropriate).
     838             :  *
     839             :  * Note that the strictness of the transition function was checked when
     840             :  * entering the values into the sort, so we don't check it again here;
     841             :  * we just apply standard SQL DISTINCT logic.
     842             :  *
     843             :  * The one-input case is handled separately from the multi-input case
     844             :  * for performance reasons: for single by-value inputs, such as the
     845             :  * common case of count(distinct id), the tuplesort_getdatum code path
     846             :  * is around 300% faster.  (The speedup for by-reference types is less
     847             :  * but still noticeable.)
     848             :  *
     849             :  * This function handles only one grouping set (already set in
     850             :  * aggstate->current_set).
     851             :  *
     852             :  * When called, CurrentMemoryContext should be the per-query context.
     853             :  */
     854             : static void
     855       49568 : process_ordered_aggregate_single(AggState *aggstate,
     856             :                                  AggStatePerTrans pertrans,
     857             :                                  AggStatePerGroup pergroupstate)
     858             : {
     859       49568 :     Datum       oldVal = (Datum) 0;
     860       49568 :     bool        oldIsNull = true;
     861       49568 :     bool        haveOldVal = false;
     862       49568 :     MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
     863             :     MemoryContext oldContext;
     864       49568 :     bool        isDistinct = (pertrans->numDistinctCols > 0);
     865       49568 :     Datum       newAbbrevVal = (Datum) 0;
     866       49568 :     Datum       oldAbbrevVal = (Datum) 0;
     867       49568 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     868             :     Datum      *newVal;
     869             :     bool       *isNull;
     870             : 
     871             :     Assert(pertrans->numDistinctCols < 2);
     872             : 
     873       49568 :     tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
     874             : 
     875             :     /* Load the column into argument 1 (arg 0 will be transition value) */
     876       49568 :     newVal = &fcinfo->args[1].value;
     877       49568 :     isNull = &fcinfo->args[1].isnull;
     878             : 
     879             :     /*
     880             :      * Note: if input type is pass-by-ref, the datums returned by the sort are
     881             :      * freshly palloc'd in the per-query context, so we must be careful to
     882             :      * pfree them when they are no longer needed.
     883             :      */
     884             : 
     885      861206 :     while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
     886             :                               true, newVal, isNull, &newAbbrevVal))
     887             :     {
     888             :         /*
     889             :          * Clear and select the working context for evaluation of the equality
     890             :          * function and transition function.
     891             :          */
     892      811638 :         MemoryContextReset(workcontext);
     893      811638 :         oldContext = MemoryContextSwitchTo(workcontext);
     894             : 
     895             :         /*
     896             :          * If DISTINCT mode, and not distinct from prior, skip it.
     897             :          */
     898      811638 :         if (isDistinct &&
     899      283396 :             haveOldVal &&
     900           4 :             ((oldIsNull && *isNull) ||
     901      283396 :              (!oldIsNull && !*isNull &&
     902      566744 :               oldAbbrevVal == newAbbrevVal &&
     903      283370 :               DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
     904             :                                              pertrans->aggCollation,
     905             :                                              oldVal, *newVal)))))
     906             :         {
     907             :             /* equal to prior, so forget this one */
     908      368540 :             if (!pertrans->inputtypeByVal && !*isNull)
     909      124604 :                 pfree(DatumGetPointer(*newVal));
     910             :         }
     911             :         else
     912             :         {
     913      567702 :             advance_transition_function(aggstate, pertrans, pergroupstate);
     914             :             /* forget the old value, if any */
     915      567702 :             if (!oldIsNull && !pertrans->inputtypeByVal)
     916      326284 :                 pfree(DatumGetPointer(oldVal));
     917             :             /* and remember the new one for subsequent equality checks */
     918      567702 :             oldVal = *newVal;
     919      567702 :             oldAbbrevVal = newAbbrevVal;
     920      567702 :             oldIsNull = *isNull;
     921      567702 :             haveOldVal = true;
     922             :         }
     923             : 
     924      811638 :         MemoryContextSwitchTo(oldContext);
     925             :     }
     926             : 
     927       49568 :     if (!oldIsNull && !pertrans->inputtypeByVal)
     928         166 :         pfree(DatumGetPointer(oldVal));
     929             : 
     930       49568 :     tuplesort_end(pertrans->sortstates[aggstate->current_set]);
     931       49568 :     pertrans->sortstates[aggstate->current_set] = NULL;
     932       49568 : }
     933             : 
     934             : /*
     935             :  * Run the transition function for a DISTINCT or ORDER BY aggregate
     936             :  * with more than one input.  This is called after we have completed
     937             :  * entering all the input values into the sort object.  We complete the
     938             :  * sort, read out the values in sorted order, and run the transition
     939             :  * function on each value (applying DISTINCT if appropriate).
     940             :  *
     941             :  * This function handles only one grouping set (already set in
     942             :  * aggstate->current_set).
     943             :  *
     944             :  * When called, CurrentMemoryContext should be the per-query context.
     945             :  */
     946             : static void
     947     1095026 : process_ordered_aggregate_multi(AggState *aggstate,
     948             :                                 AggStatePerTrans pertrans,
     949             :                                 AggStatePerGroup pergroupstate)
     950             : {
     951     1095026 :     ExprContext *tmpcontext = aggstate->tmpcontext;
     952     1095026 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     953     1095026 :     TupleTableSlot *slot1 = pertrans->sortslot;
     954     1095026 :     TupleTableSlot *slot2 = pertrans->uniqslot;
     955     1095026 :     int         numTransInputs = pertrans->numTransInputs;
     956     1095026 :     int         numDistinctCols = pertrans->numDistinctCols;
     957     1095026 :     Datum       newAbbrevVal = (Datum) 0;
     958     1095026 :     Datum       oldAbbrevVal = (Datum) 0;
     959     1095026 :     bool        haveOldValue = false;
     960     1095026 :     TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
     961             :     int         i;
     962             : 
     963     1095026 :     tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
     964             : 
     965     1095026 :     ExecClearTuple(slot1);
     966     1095026 :     if (slot2)
     967          56 :         ExecClearTuple(slot2);
     968             : 
     969     1110354 :     while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
     970             :                                   true, true, slot1, &newAbbrevVal))
     971             :     {
     972       15328 :         CHECK_FOR_INTERRUPTS();
     973             : 
     974       15328 :         tmpcontext->ecxt_outertuple = slot1;
     975       15328 :         tmpcontext->ecxt_innertuple = slot2;
     976             : 
     977       15328 :         if (numDistinctCols == 0 ||
     978         472 :             !haveOldValue ||
     979         416 :             newAbbrevVal != oldAbbrevVal ||
     980         392 :             !ExecQual(pertrans->equalfnMulti, tmpcontext))
     981             :         {
     982             :             /*
     983             :              * Extract the first numTransInputs columns as datums to pass to
     984             :              * the transfn.
     985             :              */
     986       15056 :             slot_getsomeattrs(slot1, numTransInputs);
     987             : 
     988             :             /* Load values into fcinfo */
     989             :             /* Start from 1, since the 0th arg will be the transition value */
     990       31292 :             for (i = 0; i < numTransInputs; i++)
     991             :             {
     992       16236 :                 fcinfo->args[i + 1].value = slot1->tts_values[i];
     993       16236 :                 fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
     994             :             }
     995             : 
     996       15056 :             advance_transition_function(aggstate, pertrans, pergroupstate);
     997             : 
     998       15056 :             if (numDistinctCols > 0)
     999             :             {
    1000             :                 /* swap the slot pointers to retain the current tuple */
    1001         200 :                 TupleTableSlot *tmpslot = slot2;
    1002             : 
    1003         200 :                 slot2 = slot1;
    1004         200 :                 slot1 = tmpslot;
    1005             :                 /* avoid ExecQual() calls by reusing abbreviated keys */
    1006         200 :                 oldAbbrevVal = newAbbrevVal;
    1007         200 :                 haveOldValue = true;
    1008             :             }
    1009             :         }
    1010             : 
    1011             :         /* Reset context each time */
    1012       15328 :         ResetExprContext(tmpcontext);
    1013             : 
    1014       15328 :         ExecClearTuple(slot1);
    1015             :     }
    1016             : 
    1017     1095026 :     if (slot2)
    1018          56 :         ExecClearTuple(slot2);
    1019             : 
    1020     1095026 :     tuplesort_end(pertrans->sortstates[aggstate->current_set]);
    1021     1095026 :     pertrans->sortstates[aggstate->current_set] = NULL;
    1022             : 
    1023             :     /* restore previous slot, potentially in use for grouping sets */
    1024     1095026 :     tmpcontext->ecxt_outertuple = save;
    1025     1095026 : }
    1026             : 
    1027             : /*
    1028             :  * Compute the final value of one aggregate.
    1029             :  *
    1030             :  * This function handles only one grouping set (already set in
    1031             :  * aggstate->current_set).
    1032             :  *
    1033             :  * The finalfn will be run, and the result delivered, in the
    1034             :  * output-tuple context; caller's CurrentMemoryContext does not matter.
    1035             :  *
    1036             :  * The finalfn uses the state as set in the transno. This also might be
    1037             :  * being used by another aggregate function, so it's important that we do
    1038             :  * nothing destructive here.
    1039             :  */
    1040             : static void
    1041     1839880 : finalize_aggregate(AggState *aggstate,
    1042             :                    AggStatePerAgg peragg,
    1043             :                    AggStatePerGroup pergroupstate,
    1044             :                    Datum *resultVal, bool *resultIsNull)
    1045             : {
    1046     1839880 :     LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
    1047     1839880 :     bool        anynull = false;
    1048             :     MemoryContext oldContext;
    1049             :     int         i;
    1050             :     ListCell   *lc;
    1051     1839880 :     AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
    1052             : 
    1053     1839880 :     oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
    1054             : 
    1055             :     /*
    1056             :      * Evaluate any direct arguments.  We do this even if there's no finalfn
    1057             :      * (which is unlikely anyway), so that side-effects happen as expected.
    1058             :      * The direct arguments go into arg positions 1 and up, leaving position 0
    1059             :      * for the transition state value.
    1060             :      */
    1061     1839880 :     i = 1;
    1062     1840544 :     foreach(lc, peragg->aggdirectargs)
    1063             :     {
    1064         664 :         ExprState  *expr = (ExprState *) lfirst(lc);
    1065             : 
    1066         664 :         fcinfo->args[i].value = ExecEvalExpr(expr,
    1067             :                                              aggstate->ss.ps.ps_ExprContext,
    1068             :                                              &fcinfo->args[i].isnull);
    1069         664 :         anynull |= fcinfo->args[i].isnull;
    1070         664 :         i++;
    1071             :     }
    1072             : 
    1073             :     /*
    1074             :      * Apply the agg's finalfn if one is provided, else return transValue.
    1075             :      */
    1076     1839880 :     if (OidIsValid(peragg->finalfn_oid))
    1077             :     {
    1078     1276602 :         int         numFinalArgs = peragg->numFinalArgs;
    1079             : 
    1080             :         /* set up aggstate->curperagg for AggGetAggref() */
    1081     1276602 :         aggstate->curperagg = peragg;
    1082             : 
    1083     1276602 :         InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
    1084             :                                  numFinalArgs,
    1085             :                                  pertrans->aggCollation,
    1086             :                                  (void *) aggstate, NULL);
    1087             : 
    1088             :         /* Fill in the transition state value */
    1089     1276602 :         fcinfo->args[0].value =
    1090     1276602 :             MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1091             :                                        pergroupstate->transValueIsNull,
    1092             :                                        pertrans->transtypeLen);
    1093     1276602 :         fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
    1094     1276602 :         anynull |= pergroupstate->transValueIsNull;
    1095             : 
    1096             :         /* Fill any remaining argument positions with nulls */
    1097     2430194 :         for (; i < numFinalArgs; i++)
    1098             :         {
    1099     1153592 :             fcinfo->args[i].value = (Datum) 0;
    1100     1153592 :             fcinfo->args[i].isnull = true;
    1101     1153592 :             anynull = true;
    1102             :         }
    1103             : 
    1104     1276602 :         if (fcinfo->flinfo->fn_strict && anynull)
    1105             :         {
    1106             :             /* don't call a strict function with NULL inputs */
    1107           0 :             *resultVal = (Datum) 0;
    1108           0 :             *resultIsNull = true;
    1109             :         }
    1110             :         else
    1111             :         {
    1112     1276602 :             *resultVal = FunctionCallInvoke(fcinfo);
    1113     1276602 :             *resultIsNull = fcinfo->isnull;
    1114             :         }
    1115     1276602 :         aggstate->curperagg = NULL;
    1116             :     }
    1117             :     else
    1118             :     {
    1119             :         /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
    1120      563278 :         *resultVal = pergroupstate->transValue;
    1121      563278 :         *resultIsNull = pergroupstate->transValueIsNull;
    1122             :     }
    1123             : 
    1124             :     /*
    1125             :      * If result is pass-by-ref, make sure it is in the right context.
    1126             :      */
    1127     1839880 :     if (!peragg->resulttypeByVal && !*resultIsNull &&
    1128      227448 :         !MemoryContextContains(CurrentMemoryContext,
    1129      227448 :                                DatumGetPointer(*resultVal)))
    1130       34182 :         *resultVal = datumCopy(*resultVal,
    1131       34182 :                                peragg->resulttypeByVal,
    1132       34182 :                                peragg->resulttypeLen);
    1133             : 
    1134     1839880 :     MemoryContextSwitchTo(oldContext);
    1135     1839880 : }
    1136             : 
    1137             : /*
    1138             :  * Compute the output value of one partial aggregate.
    1139             :  *
    1140             :  * The serialization function will be run, and the result delivered, in the
    1141             :  * output-tuple context; caller's CurrentMemoryContext does not matter.
    1142             :  */
    1143             : static void
    1144        7194 : finalize_partialaggregate(AggState *aggstate,
    1145             :                           AggStatePerAgg peragg,
    1146             :                           AggStatePerGroup pergroupstate,
    1147             :                           Datum *resultVal, bool *resultIsNull)
    1148             : {
    1149        7194 :     AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
    1150             :     MemoryContext oldContext;
    1151             : 
    1152        7194 :     oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
    1153             : 
    1154             :     /*
    1155             :      * serialfn_oid will be set if we must serialize the transvalue before
    1156             :      * returning it
    1157             :      */
    1158        7194 :     if (OidIsValid(pertrans->serialfn_oid))
    1159             :     {
    1160             :         /* Don't call a strict serialization function with NULL input. */
    1161          60 :         if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
    1162             :         {
    1163          48 :             *resultVal = (Datum) 0;
    1164          48 :             *resultIsNull = true;
    1165             :         }
    1166             :         else
    1167             :         {
    1168          12 :             FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
    1169             : 
    1170          12 :             fcinfo->args[0].value =
    1171          12 :                 MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1172             :                                            pergroupstate->transValueIsNull,
    1173             :                                            pertrans->transtypeLen);
    1174          12 :             fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
    1175          12 :             fcinfo->isnull = false;
    1176             : 
    1177          12 :             *resultVal = FunctionCallInvoke(fcinfo);
    1178          12 :             *resultIsNull = fcinfo->isnull;
    1179             :         }
    1180             :     }
    1181             :     else
    1182             :     {
    1183             :         /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
    1184        7134 :         *resultVal = pergroupstate->transValue;
    1185        7134 :         *resultIsNull = pergroupstate->transValueIsNull;
    1186             :     }
    1187             : 
    1188             :     /* If result is pass-by-ref, make sure it is in the right context. */
    1189        7194 :     if (!peragg->resulttypeByVal && !*resultIsNull &&
    1190        2316 :         !MemoryContextContains(CurrentMemoryContext,
    1191        2316 :                                DatumGetPointer(*resultVal)))
    1192        2304 :         *resultVal = datumCopy(*resultVal,
    1193        2304 :                                peragg->resulttypeByVal,
    1194        2304 :                                peragg->resulttypeLen);
    1195             : 
    1196        7194 :     MemoryContextSwitchTo(oldContext);
    1197        7194 : }
    1198             : 
    1199             : /*
    1200             :  * Extract the attributes that make up the grouping key into the
    1201             :  * hashslot. This is necessary to compute the hash or perform a lookup.
    1202             :  */
    1203             : static void
    1204     4055308 : prepare_hash_slot(AggState *aggstate)
    1205             : {
    1206     4055308 :     TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
    1207     4055308 :     AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
    1208     4055308 :     TupleTableSlot *hashslot = perhash->hashslot;
    1209             :     int         i;
    1210             : 
    1211             :     /* transfer just the needed columns into hashslot */
    1212     4055308 :     slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
    1213     4055308 :     ExecClearTuple(hashslot);
    1214             : 
    1215     9809666 :     for (i = 0; i < perhash->numhashGrpCols; i++)
    1216             :     {
    1217     5754358 :         int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    1218             : 
    1219     5754358 :         hashslot->tts_values[i] = inputslot->tts_values[varNumber];
    1220     5754358 :         hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
    1221             :     }
    1222     4055308 :     ExecStoreVirtualTuple(hashslot);
    1223     4055308 : }
    1224             : 
    1225             : /*
    1226             :  * Prepare to finalize and project based on the specified representative tuple
    1227             :  * slot and grouping set.
    1228             :  *
    1229             :  * In the specified tuple slot, force to null all attributes that should be
    1230             :  * read as null in the context of the current grouping set.  Also stash the
    1231             :  * current group bitmap where GroupingExpr can get at it.
    1232             :  *
    1233             :  * This relies on three conditions:
    1234             :  *
    1235             :  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
    1236             :  * only reference it in evaluations, which will only access individual
    1237             :  * attributes.
    1238             :  *
    1239             :  * 2) No system columns are going to need to be nulled. (If a system column is
    1240             :  * referenced in a group clause, it is actually projected in the outer plan
    1241             :  * tlist.)
    1242             :  *
    1243             :  * 3) Within a given phase, we never need to recover the value of an attribute
    1244             :  * once it has been set to null.
    1245             :  *
    1246             :  * Poking into the slot this way is a bit ugly, but the consensus is that the
    1247             :  * alternative was worse.
    1248             :  */
    1249             : static void
    1250     1647156 : prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
    1251             : {
    1252     1647156 :     if (aggstate->phase->grouped_cols)
    1253             :     {
    1254      369614 :         Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
    1255             : 
    1256      369614 :         aggstate->grouped_cols = grouped_cols;
    1257             : 
    1258      369614 :         if (TTS_EMPTY(slot))
    1259             :         {
    1260             :             /*
    1261             :              * Force all values to be NULL if working on an empty input tuple
    1262             :              * (i.e. an empty grouping set for which no input rows were
    1263             :              * supplied).
    1264             :              */
    1265          32 :             ExecStoreAllNullTuple(slot);
    1266             :         }
    1267      369582 :         else if (aggstate->all_grouped_cols)
    1268             :         {
    1269             :             ListCell   *lc;
    1270             : 
    1271             :             /* all_grouped_cols is arranged in desc order */
    1272      369546 :             slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
    1273             : 
    1274     1076534 :             foreach(lc, aggstate->all_grouped_cols)
    1275             :             {
    1276      706988 :                 int         attnum = lfirst_int(lc);
    1277             : 
    1278      706988 :                 if (!bms_is_member(attnum, grouped_cols))
    1279       37316 :                     slot->tts_isnull[attnum - 1] = true;
    1280             :             }
    1281             :         }
    1282             :     }
    1283     1647156 : }
    1284             : 
    1285             : /*
    1286             :  * Compute the final value of all aggregates for one group.
    1287             :  *
    1288             :  * This function handles only one grouping set at a time, which the caller must
    1289             :  * have selected.  It's also the caller's responsibility to adjust the supplied
    1290             :  * pergroup parameter to point to the current set's transvalues.
    1291             :  *
    1292             :  * Results are stored in the output econtext aggvalues/aggnulls.
    1293             :  */
    1294             : static void
    1295     1647156 : finalize_aggregates(AggState *aggstate,
    1296             :                     AggStatePerAgg peraggs,
    1297             :                     AggStatePerGroup pergroup)
    1298             : {
    1299     1647156 :     ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    1300     1647156 :     Datum      *aggvalues = econtext->ecxt_aggvalues;
    1301     1647156 :     bool       *aggnulls = econtext->ecxt_aggnulls;
    1302             :     int         aggno;
    1303             :     int         transno;
    1304             : 
    1305             :     /*
    1306             :      * If there were any DISTINCT and/or ORDER BY aggregates, sort their
    1307             :      * inputs and run the transition functions.
    1308             :      */
    1309     3494130 :     for (transno = 0; transno < aggstate->numtrans; transno++)
    1310             :     {
    1311     1846974 :         AggStatePerTrans pertrans = &aggstate->pertrans[transno];
    1312             :         AggStatePerGroup pergroupstate;
    1313             : 
    1314     1846974 :         pergroupstate = &pergroup[transno];
    1315             : 
    1316     1846974 :         if (pertrans->numSortCols > 0)
    1317             :         {
    1318             :             Assert(aggstate->aggstrategy != AGG_HASHED &&
    1319             :                    aggstate->aggstrategy != AGG_MIXED);
    1320             : 
    1321     1144594 :             if (pertrans->numInputs == 1)
    1322       49568 :                 process_ordered_aggregate_single(aggstate,
    1323             :                                                  pertrans,
    1324             :                                                  pergroupstate);
    1325             :             else
    1326     1095026 :                 process_ordered_aggregate_multi(aggstate,
    1327             :                                                 pertrans,
    1328             :                                                 pergroupstate);
    1329             :         }
    1330             :     }
    1331             : 
    1332             :     /*
    1333             :      * Run the final functions.
    1334             :      */
    1335     3494230 :     for (aggno = 0; aggno < aggstate->numaggs; aggno++)
    1336             :     {
    1337     1847074 :         AggStatePerAgg peragg = &peraggs[aggno];
    1338     1847074 :         int         transno = peragg->transno;
    1339             :         AggStatePerGroup pergroupstate;
    1340             : 
    1341     1847074 :         pergroupstate = &pergroup[transno];
    1342             : 
    1343     1847074 :         if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
    1344       14388 :             finalize_partialaggregate(aggstate, peragg, pergroupstate,
    1345        7194 :                                       &aggvalues[aggno], &aggnulls[aggno]);
    1346             :         else
    1347     3679760 :             finalize_aggregate(aggstate, peragg, pergroupstate,
    1348     1839880 :                                &aggvalues[aggno], &aggnulls[aggno]);
    1349             :     }
    1350     1647156 : }
    1351             : 
    1352             : /*
    1353             :  * Project the result of a group (whose aggs have already been calculated by
    1354             :  * finalize_aggregates). Returns the result slot, or NULL if no row is
    1355             :  * projected (suppressed by qual).
    1356             :  */
    1357             : static TupleTableSlot *
    1358     1647156 : project_aggregates(AggState *aggstate)
    1359             : {
    1360     1647156 :     ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    1361             : 
    1362             :     /*
    1363             :      * Check the qual (HAVING clause); if the group does not match, ignore it.
    1364             :      */
    1365     1647156 :     if (ExecQual(aggstate->ss.ps.qual, econtext))
    1366             :     {
    1367             :         /*
    1368             :          * Form and return projection tuple using the aggregate results and
    1369             :          * the representative input tuple.
    1370             :          */
    1371     1596220 :         return ExecProject(aggstate->ss.ps.ps_ProjInfo);
    1372             :     }
    1373             :     else
    1374       50936 :         InstrCountFiltered1(aggstate, 1);
    1375             : 
    1376       50936 :     return NULL;
    1377             : }
    1378             : 
    1379             : /*
    1380             :  * find_unaggregated_cols
    1381             :  *    Construct a bitmapset of the column numbers of un-aggregated Vars
    1382             :  *    appearing in our targetlist and qual (HAVING clause)
    1383             :  */
    1384             : static Bitmapset *
    1385        2956 : find_unaggregated_cols(AggState *aggstate)
    1386             : {
    1387        2956 :     Agg        *node = (Agg *) aggstate->ss.ps.plan;
    1388             :     Bitmapset  *colnos;
    1389             : 
    1390        2956 :     colnos = NULL;
    1391        2956 :     (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
    1392             :                                          &colnos);
    1393        2956 :     (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
    1394             :                                          &colnos);
    1395        2956 :     return colnos;
    1396             : }
    1397             : 
    1398             : static bool
    1399       24048 : find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
    1400             : {
    1401       24048 :     if (node == NULL)
    1402        2654 :         return false;
    1403       21394 :     if (IsA(node, Var))
    1404             :     {
    1405        4494 :         Var        *var = (Var *) node;
    1406             : 
    1407             :         /* setrefs.c should have set the varno to OUTER_VAR */
    1408             :         Assert(var->varno == OUTER_VAR);
    1409             :         Assert(var->varlevelsup == 0);
    1410        4494 :         *colnos = bms_add_member(*colnos, var->varattno);
    1411        4494 :         return false;
    1412             :     }
    1413       16900 :     if (IsA(node, Aggref) || IsA(node, GroupingFunc))
    1414             :     {
    1415             :         /* do not descend into aggregate exprs */
    1416        4018 :         return false;
    1417             :     }
    1418       12882 :     return expression_tree_walker(node, find_unaggregated_cols_walker,
    1419             :                                   (void *) colnos);
    1420             : }
    1421             : 
    1422             : /*
    1423             :  * (Re-)initialize the hash table(s) to empty.
    1424             :  *
    1425             :  * To implement hashed aggregation, we need a hashtable that stores a
    1426             :  * representative tuple and an array of AggStatePerGroup structs for each
    1427             :  * distinct set of GROUP BY column values.  We compute the hash key from the
    1428             :  * GROUP BY columns.  The per-group data is allocated in lookup_hash_entry(),
    1429             :  * for each entry.
    1430             :  *
    1431             :  * We have a separate hashtable and associated perhash data structure for each
    1432             :  * grouping set for which we're doing hashing.
    1433             :  *
    1434             :  * The contents of the hash tables always live in the hashcontext's per-tuple
    1435             :  * memory context (there is only one of these for all tables together, since
    1436             :  * they are all reset at the same time).
    1437             :  */
    1438             : static void
    1439       44524 : build_hash_tables(AggState *aggstate)
    1440             : {
    1441             :     int         setno;
    1442             : 
    1443       89296 :     for (setno = 0; setno < aggstate->num_hashes; ++setno)
    1444             :     {
    1445       44772 :         AggStatePerHash perhash = &aggstate->perhash[setno];
    1446             :         long        nbuckets;
    1447             :         Size        memory;
    1448             : 
    1449       44772 :         if (perhash->hashtable != NULL)
    1450             :         {
    1451       41592 :             ResetTupleHashTable(perhash->hashtable);
    1452       41592 :             continue;
    1453             :         }
    1454             : 
    1455             :         Assert(perhash->aggnode->numGroups > 0);
    1456             : 
    1457        3180 :         memory = aggstate->hash_mem_limit / aggstate->num_hashes;
    1458             : 
    1459             :         /* choose reasonable number of buckets per hashtable */
    1460        3180 :         nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
    1461        3180 :                                            perhash->aggnode->numGroups,
    1462             :                                            memory);
    1463             : 
    1464        3180 :         build_hash_table(aggstate, setno, nbuckets);
    1465             :     }
    1466             : 
    1467       44524 :     aggstate->hash_ngroups_current = 0;
    1468       44524 : }
    1469             : 
    1470             : /*
    1471             :  * Build a single hashtable for this grouping set.
    1472             :  */
    1473             : static void
    1474        3180 : build_hash_table(AggState *aggstate, int setno, long nbuckets)
    1475             : {
    1476        3180 :     AggStatePerHash perhash = &aggstate->perhash[setno];
    1477        3180 :     MemoryContext metacxt = aggstate->hash_metacxt;
    1478        3180 :     MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
    1479        3180 :     MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
    1480             :     Size        additionalsize;
    1481             : 
    1482             :     Assert(aggstate->aggstrategy == AGG_HASHED ||
    1483             :            aggstate->aggstrategy == AGG_MIXED);
    1484             : 
    1485             :     /*
    1486             :      * Used to make sure initial hash table allocation does not exceed
    1487             :      * work_mem. Note that the estimate does not include space for
    1488             :      * pass-by-reference transition data values, nor for the representative
    1489             :      * tuple of each group.
    1490             :      */
    1491        3180 :     additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
    1492             : 
    1493       15900 :     perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
    1494        3180 :                                                 perhash->hashslot->tts_tupleDescriptor,
    1495             :                                                 perhash->numCols,
    1496             :                                                 perhash->hashGrpColIdxHash,
    1497        3180 :                                                 perhash->eqfuncoids,
    1498             :                                                 perhash->hashfunctions,
    1499        3180 :                                                 perhash->aggnode->grpCollations,
    1500             :                                                 nbuckets,
    1501             :                                                 additionalsize,
    1502             :                                                 metacxt,
    1503             :                                                 hashcxt,
    1504             :                                                 tmpcxt,
    1505        3180 :                                                 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
    1506        3180 : }
    1507             : 
    1508             : /*
    1509             :  * Compute columns that actually need to be stored in hashtable entries.  The
    1510             :  * incoming tuples from the child plan node will contain grouping columns,
    1511             :  * other columns referenced in our targetlist and qual, columns used to
    1512             :  * compute the aggregate functions, and perhaps just junk columns we don't use
    1513             :  * at all.  Only columns of the first two types need to be stored in the
    1514             :  * hashtable, and getting rid of the others can make the table entries
    1515             :  * significantly smaller.  The hashtable only contains the relevant columns,
    1516             :  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
    1517             :  * into the format of the normal input descriptor.
    1518             :  *
    1519             :  * Additional columns, in addition to the columns grouped by, come from two
    1520             :  * sources: Firstly functionally dependent columns that we don't need to group
    1521             :  * by themselves, and secondly ctids for row-marks.
    1522             :  *
    1523             :  * To eliminate duplicates, we build a bitmapset of the needed columns, and
    1524             :  * then build an array of the columns included in the hashtable. We might
    1525             :  * still have duplicates if the passed-in grpColIdx has them, which can happen
    1526             :  * in edge cases from semijoins/distinct; these can't always be removed,
    1527             :  * because it's not certain that the duplicate cols will be using the same
    1528             :  * hash function.
    1529             :  *
    1530             :  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
    1531             :  * the per-query context (unlike the hash table itself).
    1532             :  */
    1533             : static void
    1534        2956 : find_hash_columns(AggState *aggstate)
    1535             : {
    1536             :     Bitmapset  *base_colnos;
    1537        2956 :     List       *outerTlist = outerPlanState(aggstate)->plan->targetlist;
    1538        2956 :     int         numHashes = aggstate->num_hashes;
    1539        2956 :     EState     *estate = aggstate->ss.ps.state;
    1540             :     int         j;
    1541             : 
    1542             :     /* Find Vars that will be needed in tlist and qual */
    1543        2956 :     base_colnos = find_unaggregated_cols(aggstate);
    1544             : 
    1545        6136 :     for (j = 0; j < numHashes; ++j)
    1546             :     {
    1547        3180 :         AggStatePerHash perhash = &aggstate->perhash[j];
    1548        3180 :         Bitmapset  *colnos = bms_copy(base_colnos);
    1549        3180 :         AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
    1550        3180 :         List       *hashTlist = NIL;
    1551             :         TupleDesc   hashDesc;
    1552             :         int         maxCols;
    1553             :         int         i;
    1554             : 
    1555        3180 :         perhash->largestGrpColIdx = 0;
    1556             : 
    1557             :         /*
    1558             :          * If we're doing grouping sets, then some Vars might be referenced in
    1559             :          * tlist/qual for the benefit of other grouping sets, but not needed
    1560             :          * when hashing; i.e. prepare_projection_slot will null them out, so
    1561             :          * there'd be no point storing them.  Use prepare_projection_slot's
    1562             :          * logic to determine which.
    1563             :          */
    1564        3180 :         if (aggstate->phases[0].grouped_cols)
    1565             :         {
    1566        3180 :             Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
    1567             :             ListCell   *lc;
    1568             : 
    1569        8012 :             foreach(lc, aggstate->all_grouped_cols)
    1570             :             {
    1571        4832 :                 int         attnum = lfirst_int(lc);
    1572             : 
    1573        4832 :                 if (!bms_is_member(attnum, grouped_cols))
    1574         616 :                     colnos = bms_del_member(colnos, attnum);
    1575             :             }
    1576             :         }
    1577             : 
    1578             :         /*
    1579             :          * Compute maximum number of input columns accounting for possible
    1580             :          * duplications in the grpColIdx array, which can happen in some edge
    1581             :          * cases where HashAggregate was generated as part of a semijoin or a
    1582             :          * DISTINCT.
    1583             :          */
    1584        3180 :         maxCols = bms_num_members(colnos) + perhash->numCols;
    1585             : 
    1586        3180 :         perhash->hashGrpColIdxInput =
    1587        3180 :             palloc(maxCols * sizeof(AttrNumber));
    1588        3180 :         perhash->hashGrpColIdxHash =
    1589        3180 :             palloc(perhash->numCols * sizeof(AttrNumber));
    1590             : 
    1591             :         /* Add all the grouping columns to colnos */
    1592        7400 :         for (i = 0; i < perhash->numCols; i++)
    1593        4220 :             colnos = bms_add_member(colnos, grpColIdx[i]);
    1594             : 
    1595             :         /*
    1596             :          * First build mapping for columns directly hashed. These are the
    1597             :          * first, because they'll be accessed when computing hash values and
    1598             :          * comparing tuples for exact matches. We also build simple mapping
    1599             :          * for execGrouping, so it knows where to find the to-be-hashed /
    1600             :          * compared columns in the input.
    1601             :          */
    1602        7400 :         for (i = 0; i < perhash->numCols; i++)
    1603             :         {
    1604        4220 :             perhash->hashGrpColIdxInput[i] = grpColIdx[i];
    1605        4220 :             perhash->hashGrpColIdxHash[i] = i + 1;
    1606        4220 :             perhash->numhashGrpCols++;
    1607             :             /* delete already mapped columns */
    1608        4220 :             bms_del_member(colnos, grpColIdx[i]);
    1609             :         }
    1610             : 
    1611             :         /* and add the remaining columns */
    1612        3552 :         while ((i = bms_first_member(colnos)) >= 0)
    1613             :         {
    1614         372 :             perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
    1615         372 :             perhash->numhashGrpCols++;
    1616             :         }
    1617             : 
    1618             :         /* and build a tuple descriptor for the hashtable */
    1619        7772 :         for (i = 0; i < perhash->numhashGrpCols; i++)
    1620             :         {
    1621        4592 :             int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    1622             : 
    1623        4592 :             hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
    1624        4592 :             perhash->largestGrpColIdx =
    1625        4592 :                 Max(varNumber + 1, perhash->largestGrpColIdx);
    1626             :         }
    1627             : 
    1628        3180 :         hashDesc = ExecTypeFromTL(hashTlist);
    1629             : 
    1630        6360 :         execTuplesHashPrepare(perhash->numCols,
    1631        3180 :                               perhash->aggnode->grpOperators,
    1632             :                               &perhash->eqfuncoids,
    1633             :                               &perhash->hashfunctions);
    1634        3180 :         perhash->hashslot =
    1635        3180 :             ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
    1636             :                                &TTSOpsMinimalTuple);
    1637             : 
    1638        3180 :         list_free(hashTlist);
    1639        3180 :         bms_free(colnos);
    1640             :     }
    1641             : 
    1642        2956 :     bms_free(base_colnos);
    1643        2956 : }
    1644             : 
    1645             : /*
    1646             :  * Estimate per-hash-table-entry overhead.
    1647             :  */
    1648             : Size
    1649       16366 : hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
    1650             : {
    1651             :     Size        tupleChunkSize;
    1652             :     Size        pergroupChunkSize;
    1653             :     Size        transitionChunkSize;
    1654       16366 :     Size        tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
    1655             :                              tupleWidth);
    1656       16366 :     Size        pergroupSize = numTrans * sizeof(AggStatePerGroupData);
    1657             : 
    1658       16366 :     tupleChunkSize = CHUNKHDRSZ + tupleSize;
    1659             : 
    1660       16366 :     if (pergroupSize > 0)
    1661        8464 :         pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
    1662             :     else
    1663        7902 :         pergroupChunkSize = 0;
    1664             : 
    1665       16366 :     if (transitionSpace > 0)
    1666        4936 :         transitionChunkSize = CHUNKHDRSZ + transitionSpace;
    1667             :     else
    1668       11430 :         transitionChunkSize = 0;
    1669             : 
    1670             :     return
    1671             :         sizeof(TupleHashEntryData) +
    1672       16366 :         tupleChunkSize +
    1673       16366 :         pergroupChunkSize +
    1674             :         transitionChunkSize;
    1675             : }
    1676             : 
    1677             : /*
    1678             :  * hashagg_recompile_expressions()
    1679             :  *
    1680             :  * Identifies the right phase, compiles the right expression given the
    1681             :  * arguments, and then sets phase->evalfunc to that expression.
    1682             :  *
    1683             :  * Different versions of the compiled expression are needed depending on
    1684             :  * whether hash aggregation has spilled or not, and whether it's reading from
    1685             :  * the outer plan or a tape. Before spilling to disk, the expression reads
    1686             :  * from the outer plan and does not need to perform a NULL check. After
    1687             :  * HashAgg begins to spill, new groups will not be created in the hash table,
    1688             :  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
    1689             :  * pointer check to the expression. Then, when reading spilled data from a
    1690             :  * tape, we change the outer slot type to be a fixed minimal tuple slot.
    1691             :  *
    1692             :  * It would be wasteful to recompile every time, so cache the compiled
    1693             :  * expressions in the AggStatePerPhase, and reuse when appropriate.
    1694             :  */
    1695             : static void
    1696       86176 : hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
    1697             : {
    1698             :     AggStatePerPhase phase;
    1699       86176 :     int         i = minslot ? 1 : 0;
    1700       86176 :     int         j = nullcheck ? 1 : 0;
    1701             : 
    1702             :     Assert(aggstate->aggstrategy == AGG_HASHED ||
    1703             :            aggstate->aggstrategy == AGG_MIXED);
    1704             : 
    1705       86176 :     if (aggstate->aggstrategy == AGG_HASHED)
    1706       52488 :         phase = &aggstate->phases[0];
    1707             :     else                        /* AGG_MIXED */
    1708       33688 :         phase = &aggstate->phases[1];
    1709             : 
    1710       86176 :     if (phase->evaltrans_cache[i][j] == NULL)
    1711             :     {
    1712          64 :         const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
    1713          64 :         bool        outerfixed = aggstate->ss.ps.outeropsfixed;
    1714          64 :         bool        dohash = true;
    1715             :         bool        dosort;
    1716             : 
    1717          64 :         dosort = aggstate->aggstrategy == AGG_MIXED ? true : false;
    1718             : 
    1719             :         /* temporarily change the outerops while compiling the expression */
    1720          64 :         if (minslot)
    1721             :         {
    1722          32 :             aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
    1723          32 :             aggstate->ss.ps.outeropsfixed = true;
    1724             :         }
    1725             : 
    1726          64 :         phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
    1727             :                                                          dosort, dohash,
    1728             :                                                          nullcheck);
    1729             : 
    1730             :         /* change back */
    1731          64 :         aggstate->ss.ps.outerops = outerops;
    1732          64 :         aggstate->ss.ps.outeropsfixed = outerfixed;
    1733             :     }
    1734             : 
    1735       86176 :     phase->evaltrans = phase->evaltrans_cache[i][j];
    1736       86176 : }
    1737             : 
    1738             : /*
    1739             :  * Set limits that trigger spilling to avoid exceeding work_mem. Consider the
    1740             :  * number of partitions we expect to create (if we do spill).
    1741             :  *
    1742             :  * There are two limits: a memory limit, and also an ngroups limit. The
    1743             :  * ngroups limit becomes important when we expect transition values to grow
    1744             :  * substantially larger than the initial value.
    1745             :  */
    1746             : void
    1747       34128 : hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits,
    1748             :                     Size *mem_limit, uint64 *ngroups_limit,
    1749             :                     int *num_partitions)
    1750             : {
    1751             :     int         npartitions;
    1752             :     Size        partition_mem;
    1753             : 
    1754             :     /* if not expected to spill, use all of work_mem */
    1755       34128 :     if (input_groups * hashentrysize < work_mem * 1024L)
    1756             :     {
    1757       25012 :         if (num_partitions != NULL)
    1758       10988 :             *num_partitions = 0;
    1759       25012 :         *mem_limit = work_mem * 1024L;
    1760       25012 :         *ngroups_limit = *mem_limit / hashentrysize;
    1761       25012 :         return;
    1762             :     }
    1763             : 
    1764             :     /*
    1765             :      * Calculate expected memory requirements for spilling, which is the size
    1766             :      * of the buffers needed for all the tapes that need to be open at once.
    1767             :      * Then, subtract that from the memory available for holding hash tables.
    1768             :      */
    1769        9116 :     npartitions = hash_choose_num_partitions(input_groups,
    1770             :                                              hashentrysize,
    1771             :                                              used_bits,
    1772             :                                              NULL);
    1773        9116 :     if (num_partitions != NULL)
    1774         152 :         *num_partitions = npartitions;
    1775             : 
    1776        9116 :     partition_mem =
    1777        9116 :         HASHAGG_READ_BUFFER_SIZE +
    1778             :         HASHAGG_WRITE_BUFFER_SIZE * npartitions;
    1779             : 
    1780             :     /*
    1781             :      * Don't set the limit below 3/4 of work_mem. In that case, we are at the
    1782             :      * minimum number of partitions, so we aren't going to dramatically exceed
    1783             :      * work mem anyway.
    1784             :      */
    1785        9116 :     if (work_mem * 1024L > 4 * partition_mem)
    1786           0 :         *mem_limit = work_mem * 1024L - partition_mem;
    1787             :     else
    1788        9116 :         *mem_limit = work_mem * 1024L * 0.75;
    1789             : 
    1790        9116 :     if (*mem_limit > hashentrysize)
    1791        9116 :         *ngroups_limit = *mem_limit / hashentrysize;
    1792             :     else
    1793           0 :         *ngroups_limit = 1;
    1794             : }
    1795             : 
    1796             : /*
    1797             :  * hash_agg_check_limits
    1798             :  *
    1799             :  * After adding a new group to the hash table, check whether we need to enter
    1800             :  * spill mode. Allocations may happen without adding new groups (for instance,
    1801             :  * if the transition state size grows), so this check is imperfect.
    1802             :  */
    1803             : static void
    1804      349310 : hash_agg_check_limits(AggState *aggstate)
    1805             : {
    1806      349310 :     uint64      ngroups = aggstate->hash_ngroups_current;
    1807      349310 :     Size        meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
    1808             :                                                      true);
    1809      349310 :     Size        hash_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
    1810             :                                                      true);
    1811             : 
    1812             :     /*
    1813             :      * Don't spill unless there's at least one group in the hash table so we
    1814             :      * can be sure to make progress even in edge cases.
    1815             :      */
    1816      349310 :     if (aggstate->hash_ngroups_current > 0 &&
    1817      349310 :         (meta_mem + hash_mem > aggstate->hash_mem_limit ||
    1818      327706 :          ngroups > aggstate->hash_ngroups_limit))
    1819             :     {
    1820       21620 :         hash_agg_enter_spill_mode(aggstate);
    1821             :     }
    1822      349310 : }
    1823             : 
    1824             : /*
    1825             :  * Enter "spill mode", meaning that no new groups are added to any of the hash
    1826             :  * tables. Tuples that would create a new group are instead spilled, and
    1827             :  * processed later.
    1828             :  */
    1829             : static void
    1830       21620 : hash_agg_enter_spill_mode(AggState *aggstate)
    1831             : {
    1832       21620 :     aggstate->hash_spill_mode = true;
    1833       21620 :     hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
    1834             : 
    1835       21620 :     if (!aggstate->hash_ever_spilled)
    1836             :     {
    1837             :         Assert(aggstate->hash_tapeinfo == NULL);
    1838             :         Assert(aggstate->hash_spills == NULL);
    1839             : 
    1840          40 :         aggstate->hash_ever_spilled = true;
    1841             : 
    1842          40 :         hashagg_tapeinfo_init(aggstate);
    1843             : 
    1844          40 :         aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
    1845             : 
    1846         104 :         for (int setno = 0; setno < aggstate->num_hashes; setno++)
    1847             :         {
    1848          64 :             AggStatePerHash perhash = &aggstate->perhash[setno];
    1849          64 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    1850             : 
    1851         128 :             hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
    1852          64 :                                perhash->aggnode->numGroups,
    1853             :                                aggstate->hashentrysize);
    1854             :         }
    1855             :     }
    1856       21620 : }
    1857             : 
    1858             : /*
    1859             :  * Update metrics after filling the hash table.
    1860             :  *
    1861             :  * If reading from the outer plan, from_tape should be false; if reading from
    1862             :  * another tape, from_tape should be true.
    1863             :  */
    1864             : static void
    1865       66626 : hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
    1866             : {
    1867             :     Size        meta_mem;
    1868             :     Size        hash_mem;
    1869             :     Size        buffer_mem;
    1870             :     Size        total_mem;
    1871             : 
    1872       66626 :     if (aggstate->aggstrategy != AGG_MIXED &&
    1873       49738 :         aggstate->aggstrategy != AGG_HASHED)
    1874           0 :         return;
    1875             : 
    1876             :     /* memory for the hash table itself */
    1877       66626 :     meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
    1878             : 
    1879             :     /* memory for the group keys and transition states */
    1880       66626 :     hash_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
    1881             : 
    1882             :     /* memory for read/write tape buffers, if spilled */
    1883       66626 :     buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
    1884       66626 :     if (from_tape)
    1885       22988 :         buffer_mem += HASHAGG_READ_BUFFER_SIZE;
    1886             : 
    1887             :     /* update peak mem */
    1888       66626 :     total_mem = meta_mem + hash_mem + buffer_mem;
    1889       66626 :     if (total_mem > aggstate->hash_mem_peak)
    1890        2094 :         aggstate->hash_mem_peak = total_mem;
    1891             : 
    1892             :     /* update disk usage */
    1893       66626 :     if (aggstate->hash_tapeinfo != NULL)
    1894             :     {
    1895       23028 :         uint64      disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
    1896             : 
    1897       23028 :         if (aggstate->hash_disk_used < disk_used)
    1898         100 :             aggstate->hash_disk_used = disk_used;
    1899             :     }
    1900             : 
    1901             :     /* update hashentrysize estimate based on contents */
    1902       66626 :     if (aggstate->hash_ngroups_current > 0)
    1903             :     {
    1904       66468 :         aggstate->hashentrysize =
    1905       66468 :             sizeof(TupleHashEntryData) +
    1906       66468 :             (hash_mem / (double) aggstate->hash_ngroups_current);
    1907             :     }
    1908             : }
    1909             : 
    1910             : /*
    1911             :  * Choose a reasonable number of buckets for the initial hash table size.
    1912             :  */
    1913             : static long
    1914        3180 : hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
    1915             : {
    1916             :     long        max_nbuckets;
    1917        3180 :     long        nbuckets = ngroups;
    1918             : 
    1919        3180 :     max_nbuckets = memory / hashentrysize;
    1920             : 
    1921             :     /*
    1922             :      * Underestimating is better than overestimating. Too many buckets crowd
    1923             :      * out space for group keys and transition state values.
    1924             :      */
    1925        3180 :     max_nbuckets >>= 1;
    1926             : 
    1927        3180 :     if (nbuckets > max_nbuckets)
    1928          88 :         nbuckets = max_nbuckets;
    1929        3180 :     if (nbuckets < HASHAGG_MIN_BUCKETS)
    1930        2820 :         nbuckets = HASHAGG_MIN_BUCKETS;
    1931        3180 :     return nbuckets;
    1932             : }
    1933             : 
    1934             : /*
    1935             :  * Determine the number of partitions to create when spilling, which will
    1936             :  * always be a power of two. If log2_npartitions is non-NULL, set
    1937             :  * *log2_npartitions to the log2() of the number of partitions.
    1938             :  */
    1939             : static int
    1940       19612 : hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
    1941             :                            int used_bits, int *log2_npartitions)
    1942             : {
    1943             :     Size        mem_wanted;
    1944             :     int         partition_limit;
    1945             :     int         npartitions;
    1946             :     int         partition_bits;
    1947             : 
    1948             :     /*
    1949             :      * Avoid creating so many partitions that the memory requirements of the
    1950             :      * open partition files are greater than 1/4 of work_mem.
    1951             :      */
    1952       19612 :     partition_limit =
    1953       19612 :         (work_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
    1954             :         HASHAGG_WRITE_BUFFER_SIZE;
    1955             : 
    1956       19612 :     mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
    1957             : 
    1958             :     /* make enough partitions so that each one is likely to fit in memory */
    1959       19612 :     npartitions = 1 + (mem_wanted / (work_mem * 1024L));
    1960             : 
    1961       19612 :     if (npartitions > partition_limit)
    1962       16760 :         npartitions = partition_limit;
    1963             : 
    1964       19612 :     if (npartitions < HASHAGG_MIN_PARTITIONS)
    1965       19612 :         npartitions = HASHAGG_MIN_PARTITIONS;
    1966       19612 :     if (npartitions > HASHAGG_MAX_PARTITIONS)
    1967           0 :         npartitions = HASHAGG_MAX_PARTITIONS;
    1968             : 
    1969             :     /* ceil(log2(npartitions)) */
    1970       19612 :     partition_bits = my_log2(npartitions);
    1971             : 
    1972             :     /* make sure that we don't exhaust the hash bits */
    1973       19612 :     if (partition_bits + used_bits >= 32)
    1974           0 :         partition_bits = 32 - used_bits;
    1975             : 
    1976       19612 :     if (log2_npartitions != NULL)
    1977       10496 :         *log2_npartitions = partition_bits;
    1978             : 
    1979             :     /* number of partitions will be a power of two */
    1980       19612 :     npartitions = 1L << partition_bits;
    1981             : 
    1982       19612 :     return npartitions;
    1983             : }
    1984             : 
    1985             : /*
    1986             :  * Find or create a hashtable entry for the tuple group containing the current
    1987             :  * tuple (already set in tmpcontext's outertuple slot), in the current grouping
    1988             :  * set (which the caller must have selected - note that initialize_aggregate
    1989             :  * depends on this).
    1990             :  *
    1991             :  * When called, CurrentMemoryContext should be the per-query context. The
    1992             :  * already-calculated hash value for the tuple must be specified.
    1993             :  *
    1994             :  * If in "spill mode", then only find existing hashtable entries; don't create
    1995             :  * new ones. If a tuple's group is not already present in the hash table for
    1996             :  * the current grouping set, assign *in_hash_table=false and the caller will
    1997             :  * spill it to disk.
    1998             :  */
    1999             : static AggStatePerGroup
    2000     4055308 : lookup_hash_entry(AggState *aggstate, uint32 hash, bool *in_hash_table)
    2001             : {
    2002     4055308 :     AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
    2003     4055308 :     TupleTableSlot *hashslot = perhash->hashslot;
    2004             :     TupleHashEntryData *entry;
    2005     4055308 :     bool        isnew = false;
    2006             :     bool       *p_isnew;
    2007             : 
    2008             :     /* if hash table already spilled, don't create new entries */
    2009     4055308 :     p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
    2010             : 
    2011             :     /* find or create the hashtable entry using the filtered tuple */
    2012     4055308 :     entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew,
    2013             :                                      hash);
    2014             : 
    2015     4055308 :     if (entry == NULL)
    2016             :     {
    2017      800532 :         *in_hash_table = false;
    2018      800532 :         return NULL;
    2019             :     }
    2020             :     else
    2021     3254776 :         *in_hash_table = true;
    2022             : 
    2023     3254776 :     if (isnew)
    2024             :     {
    2025             :         AggStatePerGroup pergroup;
    2026             :         int         transno;
    2027             : 
    2028      349310 :         aggstate->hash_ngroups_current++;
    2029      349310 :         hash_agg_check_limits(aggstate);
    2030             : 
    2031             :         /* no need to allocate or initialize per-group state */
    2032      349310 :         if (aggstate->numtrans == 0)
    2033      112940 :             return NULL;
    2034             : 
    2035             :         pergroup = (AggStatePerGroup)
    2036      236370 :             MemoryContextAlloc(perhash->hashtable->tablecxt,
    2037      236370 :                                sizeof(AggStatePerGroupData) * aggstate->numtrans);
    2038             : 
    2039      236370 :         entry->additional = pergroup;
    2040             : 
    2041             :         /*
    2042             :          * Initialize aggregates for new tuple group, lookup_hash_entries()
    2043             :          * already has selected the relevant grouping set.
    2044             :          */
    2045      649314 :         for (transno = 0; transno < aggstate->numtrans; transno++)
    2046             :         {
    2047      412944 :             AggStatePerTrans pertrans = &aggstate->pertrans[transno];
    2048      412944 :             AggStatePerGroup pergroupstate = &pergroup[transno];
    2049             : 
    2050      412944 :             initialize_aggregate(aggstate, pertrans, pergroupstate);
    2051             :         }
    2052             :     }
    2053             : 
    2054     3141836 :     return entry->additional;
    2055             : }
    2056             : 
    2057             : /*
    2058             :  * Look up hash entries for the current tuple in all hashed grouping sets,
    2059             :  * returning an array of pergroup pointers suitable for advance_aggregates.
    2060             :  *
    2061             :  * Be aware that lookup_hash_entry can reset the tmpcontext.
    2062             :  *
    2063             :  * Some entries may be left NULL if we are in "spill mode". The same tuple
    2064             :  * will belong to different groups for each grouping set, so may match a group
    2065             :  * already in memory for one set and match a group not in memory for another
    2066             :  * set. When in "spill mode", the tuple will be spilled for each grouping set
    2067             :  * where it doesn't match a group in memory.
    2068             :  *
    2069             :  * NB: It's possible to spill the same tuple for several different grouping
    2070             :  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
    2071             :  * the tuple multiple times for multiple grouping sets, it can be partitioned
    2072             :  * for each grouping set, making the refilling of the hash table very
    2073             :  * efficient.
    2074             :  */
    2075             : static void
    2076     3165776 : lookup_hash_entries(AggState *aggstate)
    2077             : {
    2078     3165776 :     AggStatePerGroup *pergroup = aggstate->hash_pergroup;
    2079             :     int         setno;
    2080             : 
    2081     6420552 :     for (setno = 0; setno < aggstate->num_hashes; setno++)
    2082             :     {
    2083     3254776 :         AggStatePerHash perhash = &aggstate->perhash[setno];
    2084             :         uint32      hash;
    2085             :         bool        in_hash_table;
    2086             : 
    2087     3254776 :         select_current_set(aggstate, setno, true);
    2088     3254776 :         prepare_hash_slot(aggstate);
    2089     3254776 :         hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot);
    2090     3254776 :         pergroup[setno] = lookup_hash_entry(aggstate, hash, &in_hash_table);
    2091             : 
    2092             :         /* check to see if we need to spill the tuple for this grouping set */
    2093     3254776 :         if (!in_hash_table)
    2094             :         {
    2095      233084 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    2096      233084 :             TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
    2097             : 
    2098      233084 :             if (spill->partitions == NULL)
    2099           0 :                 hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
    2100           0 :                                    perhash->aggnode->numGroups,
    2101             :                                    aggstate->hashentrysize);
    2102             : 
    2103      233084 :             hashagg_spill_tuple(spill, slot, hash);
    2104             :         }
    2105             :     }
    2106     3165776 : }
    2107             : 
    2108             : /*
    2109             :  * ExecAgg -
    2110             :  *
    2111             :  *    ExecAgg receives tuples from its outer subplan and aggregates over
    2112             :  *    the appropriate attribute for each aggregate function use (Aggref
    2113             :  *    node) appearing in the targetlist or qual of the node.  The number
    2114             :  *    of tuples to aggregate over depends on whether grouped or plain
    2115             :  *    aggregation is selected.  In grouped aggregation, we produce a result
    2116             :  *    row for each group; in plain aggregation there's a single result row
    2117             :  *    for the whole query.  In either case, the value of each aggregate is
    2118             :  *    stored in the expression context to be used when ExecProject evaluates
    2119             :  *    the result tuple.
    2120             :  */
    2121             : static TupleTableSlot *
    2122     2791926 : ExecAgg(PlanState *pstate)
    2123             : {
    2124     2791926 :     AggState   *node = castNode(AggState, pstate);
    2125     2791926 :     TupleTableSlot *result = NULL;
    2126             : 
    2127     2791926 :     CHECK_FOR_INTERRUPTS();
    2128             : 
    2129     2791926 :     if (!node->agg_done)
    2130             :     {
    2131             :         /* Dispatch based on strategy */
    2132     1640852 :         switch (node->phase->aggstrategy)
    2133             :         {
    2134      333400 :             case AGG_HASHED:
    2135      333400 :                 if (!node->table_filled)
    2136       43586 :                     agg_fill_hash_table(node);
    2137             :                 /* FALLTHROUGH */
    2138             :             case AGG_MIXED:
    2139      350800 :                 result = agg_retrieve_hash_table(node);
    2140      350800 :                 break;
    2141     1290052 :             case AGG_PLAIN:
    2142             :             case AGG_SORTED:
    2143     1290052 :                 result = agg_retrieve_direct(node);
    2144     1290010 :                 break;
    2145             :         }
    2146             : 
    2147     1640810 :         if (!TupIsNull(result))
    2148     1596220 :             return result;
    2149             :     }
    2150             : 
    2151     1195664 :     return NULL;
    2152             : }
    2153             : 
    2154             : /*
    2155             :  * ExecAgg for non-hashed case
    2156             :  */
    2157             : static TupleTableSlot *
    2158     1290052 : agg_retrieve_direct(AggState *aggstate)
    2159             : {
    2160     1290052 :     Agg        *node = aggstate->phase->aggnode;
    2161             :     ExprContext *econtext;
    2162             :     ExprContext *tmpcontext;
    2163             :     AggStatePerAgg peragg;
    2164             :     AggStatePerGroup *pergroups;
    2165             :     TupleTableSlot *outerslot;
    2166             :     TupleTableSlot *firstSlot;
    2167             :     TupleTableSlot *result;
    2168     1290052 :     bool        hasGroupingSets = aggstate->phase->numsets > 0;
    2169     1290052 :     int         numGroupingSets = Max(aggstate->phase->numsets, 1);
    2170             :     int         currentSet;
    2171             :     int         nextSetSize;
    2172             :     int         numReset;
    2173             :     int         i;
    2174             : 
    2175             :     /*
    2176             :      * get state info from node
    2177             :      *
    2178             :      * econtext is the per-output-tuple expression context
    2179             :      *
    2180             :      * tmpcontext is the per-input-tuple expression context
    2181             :      */
    2182     1290052 :     econtext = aggstate->ss.ps.ps_ExprContext;
    2183     1290052 :     tmpcontext = aggstate->tmpcontext;
    2184             : 
    2185     1290052 :     peragg = aggstate->peragg;
    2186     1290052 :     pergroups = aggstate->pergroups;
    2187     1290052 :     firstSlot = aggstate->ss.ss_ScanTupleSlot;
    2188             : 
    2189             :     /*
    2190             :      * We loop retrieving groups until we find one matching
    2191             :      * aggstate->ss.ps.qual
    2192             :      *
    2193             :      * For grouping sets, we have the invariant that aggstate->projected_set
    2194             :      * is either -1 (initial call) or the index (starting from 0) in
    2195             :      * gset_lengths for the group we just completed (either by projecting a
    2196             :      * row or by discarding it in the qual).
    2197             :      */
    2198     1297342 :     while (!aggstate->agg_done)
    2199             :     {
    2200             :         /*
    2201             :          * Clear the per-output-tuple context for each group, as well as
    2202             :          * aggcontext (which contains any pass-by-ref transvalues of the old
    2203             :          * group).  Some aggregate functions store working state in child
    2204             :          * contexts; those now get reset automatically without us needing to
    2205             :          * do anything special.
    2206             :          *
    2207             :          * We use ReScanExprContext not just ResetExprContext because we want
    2208             :          * any registered shutdown callbacks to be called.  That allows
    2209             :          * aggregate functions to ensure they've cleaned up any non-memory
    2210             :          * resources.
    2211             :          */
    2212     1297256 :         ReScanExprContext(econtext);
    2213             : 
    2214             :         /*
    2215             :          * Determine how many grouping sets need to be reset at this boundary.
    2216             :          */
    2217     1297256 :         if (aggstate->projected_set >= 0 &&
    2218      143270 :             aggstate->projected_set < numGroupingSets)
    2219      143266 :             numReset = aggstate->projected_set + 1;
    2220             :         else
    2221     1153990 :             numReset = numGroupingSets;
    2222             : 
    2223             :         /*
    2224             :          * numReset can change on a phase boundary, but that's OK; we want to
    2225             :          * reset the contexts used in _this_ phase, and later, after possibly
    2226             :          * changing phase, initialize the right number of aggregates for the
    2227             :          * _new_ phase.
    2228             :          */
    2229             : 
    2230     2608580 :         for (i = 0; i < numReset; i++)
    2231             :         {
    2232     1311324 :             ReScanExprContext(aggstate->aggcontexts[i]);
    2233             :         }
    2234             : 
    2235             :         /*
    2236             :          * Check if input is complete and there are no more groups to project
    2237             :          * in this phase; move to next phase or mark as done.
    2238             :          */
    2239     1297256 :         if (aggstate->input_done == true &&
    2240         896 :             aggstate->projected_set >= (numGroupingSets - 1))
    2241             :         {
    2242         440 :             if (aggstate->current_phase < aggstate->numphases - 1)
    2243             :             {
    2244         120 :                 initialize_phase(aggstate, aggstate->current_phase + 1);
    2245         120 :                 aggstate->input_done = false;
    2246         120 :                 aggstate->projected_set = -1;
    2247         120 :                 numGroupingSets = Max(aggstate->phase->numsets, 1);
    2248         120 :                 node = aggstate->phase->aggnode;
    2249         120 :                 numReset = numGroupingSets;
    2250             :             }
    2251         320 :             else if (aggstate->aggstrategy == AGG_MIXED)
    2252             :             {
    2253             :                 /*
    2254             :                  * Mixed mode; we've output all the grouped stuff and have
    2255             :                  * full hashtables, so switch to outputting those.
    2256             :                  */
    2257          60 :                 initialize_phase(aggstate, 0);
    2258          60 :                 aggstate->table_filled = true;
    2259          60 :                 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
    2260             :                                        &aggstate->perhash[0].hashiter);
    2261          60 :                 select_current_set(aggstate, 0, true);
    2262          60 :                 return agg_retrieve_hash_table(aggstate);
    2263             :             }
    2264             :             else
    2265             :             {
    2266         260 :                 aggstate->agg_done = true;
    2267         260 :                 break;
    2268             :             }
    2269             :         }
    2270             : 
    2271             :         /*
    2272             :          * Get the number of columns in the next grouping set after the last
    2273             :          * projected one (if any). This is the number of columns to compare to
    2274             :          * see if we reached the boundary of that set too.
    2275             :          */
    2276     1296936 :         if (aggstate->projected_set >= 0 &&
    2277      142830 :             aggstate->projected_set < (numGroupingSets - 1))
    2278       17944 :             nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
    2279             :         else
    2280     1278992 :             nextSetSize = 0;
    2281             : 
    2282             :         /*----------
    2283             :          * If a subgroup for the current grouping set is present, project it.
    2284             :          *
    2285             :          * We have a new group if:
    2286             :          *  - we're out of input but haven't projected all grouping sets
    2287             :          *    (checked above)
    2288             :          * OR
    2289             :          *    - we already projected a row that wasn't from the last grouping
    2290             :          *      set
    2291             :          *    AND
    2292             :          *    - the next grouping set has at least one grouping column (since
    2293             :          *      empty grouping sets project only once input is exhausted)
    2294             :          *    AND
    2295             :          *    - the previous and pending rows differ on the grouping columns
    2296             :          *      of the next grouping set
    2297             :          *----------
    2298             :          */
    2299     1296936 :         tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
    2300     1296936 :         if (aggstate->input_done ||
    2301     1296480 :             (node->aggstrategy != AGG_PLAIN &&
    2302      143460 :              aggstate->projected_set != -1 &&
    2303      142374 :              aggstate->projected_set < (numGroupingSets - 1) &&
    2304       13160 :              nextSetSize > 0 &&
    2305       13160 :              !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
    2306             :                                tmpcontext)))
    2307             :         {
    2308        9212 :             aggstate->projected_set += 1;
    2309             : 
    2310             :             Assert(aggstate->projected_set < numGroupingSets);
    2311        9212 :             Assert(nextSetSize > 0 || aggstate->input_done);
    2312             :         }
    2313             :         else
    2314             :         {
    2315             :             /*
    2316             :              * We no longer care what group we just projected, the next
    2317             :              * projection will always be the first (or only) grouping set
    2318             :              * (unless the input proves to be empty).
    2319             :              */
    2320     1287724 :             aggstate->projected_set = 0;
    2321             : 
    2322             :             /*
    2323             :              * If we don't already have the first tuple of the new group,
    2324             :              * fetch it from the outer plan.
    2325             :              */
    2326     1287724 :             if (aggstate->grp_firstTuple == NULL)
    2327             :             {
    2328     1154106 :                 outerslot = fetch_input_tuple(aggstate);
    2329     1154092 :                 if (!TupIsNull(outerslot))
    2330             :                 {
    2331             :                     /*
    2332             :                      * Make a copy of the first input tuple; we will use this
    2333             :                      * for comparisons (in group mode) and for projection.
    2334             :                      */
    2335       70176 :                     aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
    2336             :                 }
    2337             :                 else
    2338             :                 {
    2339             :                     /* outer plan produced no tuples at all */
    2340     1083916 :                     if (hasGroupingSets)
    2341             :                     {
    2342             :                         /*
    2343             :                          * If there was no input at all, we need to project
    2344             :                          * rows only if there are grouping sets of size 0.
    2345             :                          * Note that this implies that there can't be any
    2346             :                          * references to ungrouped Vars, which would otherwise
    2347             :                          * cause issues with the empty output slot.
    2348             :                          *
    2349             :                          * XXX: This is no longer true, we currently deal with
    2350             :                          * this in finalize_aggregates().
    2351             :                          */
    2352          36 :                         aggstate->input_done = true;
    2353             : 
    2354          52 :                         while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
    2355             :                         {
    2356          20 :                             aggstate->projected_set += 1;
    2357          20 :                             if (aggstate->projected_set >= numGroupingSets)
    2358             :                             {
    2359             :                                 /*
    2360             :                                  * We can't set agg_done here because we might
    2361             :                                  * have more phases to do, even though the
    2362             :                                  * input is empty. So we need to restart the
    2363             :                                  * whole outer loop.
    2364             :                                  */
    2365           4 :                                 break;
    2366             :                             }
    2367             :                         }
    2368             : 
    2369          36 :                         if (aggstate->projected_set >= numGroupingSets)
    2370           4 :                             continue;
    2371             :                     }
    2372             :                     else
    2373             :                     {
    2374     1083880 :                         aggstate->agg_done = true;
    2375             :                         /* If we are grouping, we should produce no tuples too */
    2376     1083880 :                         if (node->aggstrategy != AGG_PLAIN)
    2377         104 :                             return NULL;
    2378             :                     }
    2379             :                 }
    2380             :             }
    2381             : 
    2382             :             /*
    2383             :              * Initialize working state for a new input tuple group.
    2384             :              */
    2385     1287602 :             initialize_aggregates(aggstate, pergroups, numReset);
    2386             : 
    2387     1287602 :             if (aggstate->grp_firstTuple != NULL)
    2388             :             {
    2389             :                 /*
    2390             :                  * Store the copied first input tuple in the tuple table slot
    2391             :                  * reserved for it.  The tuple will be deleted when it is
    2392             :                  * cleared from the slot.
    2393             :                  */
    2394      203794 :                 ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
    2395             :                                         firstSlot, true);
    2396      203794 :                 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
    2397             : 
    2398             :                 /* set up for first advance_aggregates call */
    2399      203794 :                 tmpcontext->ecxt_outertuple = firstSlot;
    2400             : 
    2401             :                 /*
    2402             :                  * Process each outer-plan tuple, and then fetch the next one,
    2403             :                  * until we exhaust the outer plan or cross a group boundary.
    2404             :                  */
    2405             :                 for (;;)
    2406             :                 {
    2407             :                     /*
    2408             :                      * During phase 1 only of a mixed agg, we need to update
    2409             :                      * hashtables as well in advance_aggregates.
    2410             :                      */
    2411    11318732 :                     if (aggstate->aggstrategy == AGG_MIXED &&
    2412       25380 :                         aggstate->current_phase == 1)
    2413             :                     {
    2414       25380 :                         lookup_hash_entries(aggstate);
    2415             :                     }
    2416             : 
    2417             :                     /* Advance the aggregates (or combine functions) */
    2418    11318732 :                     advance_aggregates(aggstate);
    2419             : 
    2420             :                     /* Reset per-input-tuple context after each tuple */
    2421    11318704 :                     ResetExprContext(tmpcontext);
    2422             : 
    2423    11318704 :                     outerslot = fetch_input_tuple(aggstate);
    2424    11318704 :                     if (TupIsNull(outerslot))
    2425             :                     {
    2426             :                         /* no more outer-plan tuples available */
    2427             : 
    2428             :                         /* if we built hash tables, finalize any spills */
    2429       70144 :                         if (aggstate->aggstrategy == AGG_MIXED &&
    2430          52 :                             aggstate->current_phase == 1)
    2431          52 :                             hashagg_finish_initial_spills(aggstate);
    2432             : 
    2433       70144 :                         if (hasGroupingSets)
    2434             :                         {
    2435         404 :                             aggstate->input_done = true;
    2436         404 :                             break;
    2437             :                         }
    2438             :                         else
    2439             :                         {
    2440       69740 :                             aggstate->agg_done = true;
    2441       69740 :                             break;
    2442             :                         }
    2443             :                     }
    2444             :                     /* set up for next advance_aggregates call */
    2445    11248560 :                     tmpcontext->ecxt_outertuple = outerslot;
    2446             : 
    2447             :                     /*
    2448             :                      * If we are grouping, check whether we've crossed a group
    2449             :                      * boundary.
    2450             :                      */
    2451    11248560 :                     if (node->aggstrategy != AGG_PLAIN)
    2452             :                     {
    2453     1176644 :                         tmpcontext->ecxt_innertuple = firstSlot;
    2454     1176644 :                         if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
    2455             :                                       tmpcontext))
    2456             :                         {
    2457      133622 :                             aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
    2458      133622 :                             break;
    2459             :                         }
    2460             :                     }
    2461             :                 }
    2462             :             }
    2463             : 
    2464             :             /*
    2465             :              * Use the representative input tuple for any references to
    2466             :              * non-aggregated input columns in aggregate direct args, the node
    2467             :              * qual, and the tlist.  (If we are not grouping, and there are no
    2468             :              * input rows at all, we will come here with an empty firstSlot
    2469             :              * ... but if not grouping, there can't be any references to
    2470             :              * non-aggregated input columns, so no problem.)
    2471             :              */
    2472     1287574 :             econtext->ecxt_outertuple = firstSlot;
    2473             :         }
    2474             : 
    2475             :         Assert(aggstate->projected_set >= 0);
    2476             : 
    2477     1296786 :         currentSet = aggstate->projected_set;
    2478             : 
    2479     1296786 :         prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
    2480             : 
    2481     1296786 :         select_current_set(aggstate, currentSet, false);
    2482             : 
    2483     1296786 :         finalize_aggregates(aggstate,
    2484             :                             peragg,
    2485     1296786 :                             pergroups[currentSet]);
    2486             : 
    2487             :         /*
    2488             :          * If there's no row to project right now, we must continue rather
    2489             :          * than returning a null since there might be more groups.
    2490             :          */
    2491     1296786 :         result = project_aggregates(aggstate);
    2492     1296786 :         if (result)
    2493     1289500 :             return result;
    2494             :     }
    2495             : 
    2496             :     /* No more groups */
    2497         346 :     return NULL;
    2498             : }
    2499             : 
    2500             : /*
    2501             :  * ExecAgg for hashed case: read input and build hash table
    2502             :  */
    2503             : static void
    2504       43586 : agg_fill_hash_table(AggState *aggstate)
    2505             : {
    2506             :     TupleTableSlot *outerslot;
    2507       43586 :     ExprContext *tmpcontext = aggstate->tmpcontext;
    2508             : 
    2509             :     /*
    2510             :      * Process each outer-plan tuple, and then fetch the next one, until we
    2511             :      * exhaust the outer plan.
    2512             :      */
    2513             :     for (;;)
    2514             :     {
    2515     3183982 :         outerslot = fetch_input_tuple(aggstate);
    2516     3183982 :         if (TupIsNull(outerslot))
    2517             :             break;
    2518             : 
    2519             :         /* set up for lookup_hash_entries and advance_aggregates */
    2520     3140396 :         tmpcontext->ecxt_outertuple = outerslot;
    2521             : 
    2522             :         /* Find or build hashtable entries */
    2523     3140396 :         lookup_hash_entries(aggstate);
    2524             : 
    2525             :         /* Advance the aggregates (or combine functions) */
    2526     3140396 :         advance_aggregates(aggstate);
    2527             : 
    2528             :         /*
    2529             :          * Reset per-input-tuple context after each tuple, but note that the
    2530             :          * hash lookups do this too
    2531             :          */
    2532     3140396 :         ResetExprContext(aggstate->tmpcontext);
    2533             :     }
    2534             : 
    2535             :     /* finalize spills, if any */
    2536       43586 :     hashagg_finish_initial_spills(aggstate);
    2537             : 
    2538       43586 :     aggstate->table_filled = true;
    2539             :     /* Initialize to walk the first hash table */
    2540       43586 :     select_current_set(aggstate, 0, true);
    2541       43586 :     ResetTupleHashIterator(aggstate->perhash[0].hashtable,
    2542             :                            &aggstate->perhash[0].hashiter);
    2543       43586 : }
    2544             : 
    2545             : /*
    2546             :  * If any data was spilled during hash aggregation, reset the hash table and
    2547             :  * reprocess one batch of spilled data. After reprocessing a batch, the hash
    2548             :  * table will again contain data, ready to be consumed by
    2549             :  * agg_retrieve_hash_table_in_memory().
    2550             :  *
    2551             :  * Should only be called after all in memory hash table entries have been
    2552             :  * finalized and emitted.
    2553             :  *
    2554             :  * Return false when input is exhausted and there's no more work to be done;
    2555             :  * otherwise return true.
    2556             :  */
    2557             : static bool
    2558       67128 : agg_refill_hash_table(AggState *aggstate)
    2559             : {
    2560             :     HashAggBatch *batch;
    2561             :     HashAggSpill spill;
    2562       67128 :     HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
    2563             :     uint64      ngroups_estimate;
    2564       67128 :     bool        spill_initialized = false;
    2565             : 
    2566       67128 :     if (aggstate->hash_batches == NIL)
    2567       44140 :         return false;
    2568             : 
    2569       22988 :     batch = linitial(aggstate->hash_batches);
    2570       22988 :     aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
    2571             : 
    2572             :     /*
    2573             :      * Estimate the number of groups for this batch as the total number of
    2574             :      * tuples in its input file. Although that's a worst case, it's not bad
    2575             :      * here for two reasons: (1) overestimating is better than
    2576             :      * underestimating; and (2) we've already scanned the relation once, so
    2577             :      * it's likely that we've already finalized many of the common values.
    2578             :      */
    2579       22988 :     ngroups_estimate = batch->input_tuples;
    2580             : 
    2581       22988 :     hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
    2582             :                         batch->used_bits, &aggstate->hash_mem_limit,
    2583             :                         &aggstate->hash_ngroups_limit, NULL);
    2584             : 
    2585             :     /* there could be residual pergroup pointers; clear them */
    2586       22988 :     for (int setoff = 0;
    2587      169980 :          setoff < aggstate->maxsets + aggstate->num_hashes;
    2588      146992 :          setoff++)
    2589      146992 :         aggstate->all_pergroups[setoff] = NULL;
    2590             : 
    2591             :     /* free memory and reset hash tables */
    2592       22988 :     ReScanExprContext(aggstate->hashcontext);
    2593      146992 :     for (int setno = 0; setno < aggstate->num_hashes; setno++)
    2594      124004 :         ResetTupleHashTable(aggstate->perhash[setno].hashtable);
    2595             : 
    2596       22988 :     aggstate->hash_ngroups_current = 0;
    2597             : 
    2598             :     /*
    2599             :      * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
    2600             :      * happens in phase 0. So, we switch to phase 1 when processing a batch,
    2601             :      * and back to phase 0 after the batch is done.
    2602             :      */
    2603             :     Assert(aggstate->current_phase == 0);
    2604       22988 :     if (aggstate->phase->aggstrategy == AGG_MIXED)
    2605             :     {
    2606       16836 :         aggstate->current_phase = 1;
    2607       16836 :         aggstate->phase = &aggstate->phases[aggstate->current_phase];
    2608             :     }
    2609             : 
    2610       22988 :     select_current_set(aggstate, batch->setno, true);
    2611             : 
    2612             :     /*
    2613             :      * Spilled tuples are always read back as MinimalTuples, which may be
    2614             :      * different from the outer plan, so recompile the aggregate expressions.
    2615             :      *
    2616             :      * We still need the NULL check, because we are only processing one
    2617             :      * grouping set at a time and the rest will be NULL.
    2618             :      */
    2619       22988 :     hashagg_recompile_expressions(aggstate, true, true);
    2620             : 
    2621       22988 :     LogicalTapeRewindForRead(tapeinfo->tapeset, batch->input_tapenum,
    2622             :                              HASHAGG_READ_BUFFER_SIZE);
    2623             :     for (;;)
    2624      800532 :     {
    2625      823520 :         TupleTableSlot *slot = aggstate->hash_spill_slot;
    2626             :         MinimalTuple tuple;
    2627             :         uint32      hash;
    2628             :         bool        in_hash_table;
    2629             : 
    2630      823520 :         CHECK_FOR_INTERRUPTS();
    2631             : 
    2632      823520 :         tuple = hashagg_batch_read(batch, &hash);
    2633      823520 :         if (tuple == NULL)
    2634       22988 :             break;
    2635             : 
    2636      800532 :         ExecStoreMinimalTuple(tuple, slot, true);
    2637      800532 :         aggstate->tmpcontext->ecxt_outertuple = slot;
    2638             : 
    2639      800532 :         prepare_hash_slot(aggstate);
    2640     1601064 :         aggstate->hash_pergroup[batch->setno] =
    2641      800532 :             lookup_hash_entry(aggstate, hash, &in_hash_table);
    2642             : 
    2643      800532 :         if (in_hash_table)
    2644             :         {
    2645             :             /* Advance the aggregates (or combine functions) */
    2646      233084 :             advance_aggregates(aggstate);
    2647             :         }
    2648             :         else
    2649             :         {
    2650      567448 :             if (!spill_initialized)
    2651             :             {
    2652             :                 /*
    2653             :                  * Avoid initializing the spill until we actually need it so
    2654             :                  * that we don't assign tapes that will never be used.
    2655             :                  */
    2656       10432 :                 spill_initialized = true;
    2657       10432 :                 hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
    2658             :                                    ngroups_estimate, aggstate->hashentrysize);
    2659             :             }
    2660             :             /* no memory for a new group, spill */
    2661      567448 :             hashagg_spill_tuple(&spill, slot, hash);
    2662             :         }
    2663             : 
    2664             :         /*
    2665             :          * Reset per-input-tuple context after each tuple, but note that the
    2666             :          * hash lookups do this too
    2667             :          */
    2668      800532 :         ResetExprContext(aggstate->tmpcontext);
    2669             :     }
    2670             : 
    2671       22988 :     hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
    2672             : 
    2673             :     /* change back to phase 0 */
    2674       22988 :     aggstate->current_phase = 0;
    2675       22988 :     aggstate->phase = &aggstate->phases[aggstate->current_phase];
    2676             : 
    2677       22988 :     if (spill_initialized)
    2678             :     {
    2679       10432 :         hash_agg_update_metrics(aggstate, true, spill.npartitions);
    2680       10432 :         hashagg_spill_finish(aggstate, &spill, batch->setno);
    2681             :     }
    2682             :     else
    2683       12556 :         hash_agg_update_metrics(aggstate, true, 0);
    2684             : 
    2685       22988 :     aggstate->hash_spill_mode = false;
    2686             : 
    2687             :     /* prepare to walk the first hash table */
    2688       22988 :     select_current_set(aggstate, batch->setno, true);
    2689       22988 :     ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
    2690             :                            &aggstate->perhash[batch->setno].hashiter);
    2691             : 
    2692       22988 :     pfree(batch);
    2693             : 
    2694       22988 :     return true;
    2695             : }
    2696             : 
    2697             : /*
    2698             :  * ExecAgg for hashed case: retrieving groups from hash table
    2699             :  *
    2700             :  * After exhausting in-memory tuples, also try refilling the hash table using
    2701             :  * previously-spilled tuples. Only returns NULL after all in-memory and
    2702             :  * spilled tuples are exhausted.
    2703             :  */
    2704             : static TupleTableSlot *
    2705      350860 : agg_retrieve_hash_table(AggState *aggstate)
    2706             : {
    2707      350860 :     TupleTableSlot *result = NULL;
    2708             : 
    2709      680568 :     while (result == NULL)
    2710             :     {
    2711      373848 :         result = agg_retrieve_hash_table_in_memory(aggstate);
    2712      373848 :         if (result == NULL)
    2713             :         {
    2714       67128 :             if (!agg_refill_hash_table(aggstate))
    2715             :             {
    2716       44140 :                 aggstate->agg_done = true;
    2717       44140 :                 break;
    2718             :             }
    2719             :         }
    2720             :     }
    2721             : 
    2722      350860 :     return result;
    2723             : }
    2724             : 
    2725             : /*
    2726             :  * Retrieve the groups from the in-memory hash tables without considering any
    2727             :  * spilled tuples.
    2728             :  */
    2729             : static TupleTableSlot *
    2730      373848 : agg_retrieve_hash_table_in_memory(AggState *aggstate)
    2731             : {
    2732             :     ExprContext *econtext;
    2733             :     AggStatePerAgg peragg;
    2734             :     AggStatePerGroup pergroup;
    2735             :     TupleHashEntryData *entry;
    2736             :     TupleTableSlot *firstSlot;
    2737             :     TupleTableSlot *result;
    2738             :     AggStatePerHash perhash;
    2739             : 
    2740             :     /*
    2741             :      * get state info from node.
    2742             :      *
    2743             :      * econtext is the per-output-tuple expression context.
    2744             :      */
    2745      373848 :     econtext = aggstate->ss.ps.ps_ExprContext;
    2746      373848 :     peragg = aggstate->peragg;
    2747      373848 :     firstSlot = aggstate->ss.ss_ScanTupleSlot;
    2748             : 
    2749             :     /*
    2750             :      * Note that perhash (and therefore anything accessed through it) can
    2751             :      * change inside the loop, as we change between grouping sets.
    2752             :      */
    2753      373848 :     perhash = &aggstate->perhash[aggstate->current_set];
    2754             : 
    2755             :     /*
    2756             :      * We loop retrieving groups until we find one satisfying
    2757             :      * aggstate->ss.ps.qual
    2758             :      */
    2759             :     for (;;)
    2760      109760 :     {
    2761      483608 :         TupleTableSlot *hashslot = perhash->hashslot;
    2762             :         int         i;
    2763             : 
    2764      483608 :         CHECK_FOR_INTERRUPTS();
    2765             : 
    2766             :         /*
    2767             :          * Find the next entry in the hash table
    2768             :          */
    2769      483608 :         entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
    2770      483608 :         if (entry == NULL)
    2771             :         {
    2772      133238 :             int         nextset = aggstate->current_set + 1;
    2773             : 
    2774      133238 :             if (nextset < aggstate->num_hashes)
    2775             :             {
    2776             :                 /*
    2777             :                  * Switch to next grouping set, reinitialize, and restart the
    2778             :                  * loop.
    2779             :                  */
    2780       66110 :                 select_current_set(aggstate, nextset, true);
    2781             : 
    2782       66110 :                 perhash = &aggstate->perhash[aggstate->current_set];
    2783             : 
    2784       66110 :                 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
    2785             : 
    2786       66110 :                 continue;
    2787             :             }
    2788             :             else
    2789             :             {
    2790       67128 :                 return NULL;
    2791             :             }
    2792             :         }
    2793             : 
    2794             :         /*
    2795             :          * Clear the per-output-tuple context for each group
    2796             :          *
    2797             :          * We intentionally don't use ReScanExprContext here; if any aggs have
    2798             :          * registered shutdown callbacks, they mustn't be called yet, since we
    2799             :          * might not be done with that agg.
    2800             :          */
    2801      350370 :         ResetExprContext(econtext);
    2802             : 
    2803             :         /*
    2804             :          * Transform representative tuple back into one with the right
    2805             :          * columns.
    2806             :          */
    2807      350370 :         ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
    2808      350370 :         slot_getallattrs(hashslot);
    2809             : 
    2810      350370 :         ExecClearTuple(firstSlot);
    2811      350370 :         memset(firstSlot->tts_isnull, true,
    2812      350370 :                firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
    2813             : 
    2814      983894 :         for (i = 0; i < perhash->numhashGrpCols; i++)
    2815             :         {
    2816      633524 :             int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    2817             : 
    2818      633524 :             firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
    2819      633524 :             firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
    2820             :         }
    2821      350370 :         ExecStoreVirtualTuple(firstSlot);
    2822             : 
    2823      350370 :         pergroup = (AggStatePerGroup) entry->additional;
    2824             : 
    2825             :         /*
    2826             :          * Use the representative input tuple for any references to
    2827             :          * non-aggregated input columns in the qual and tlist.
    2828             :          */
    2829      350370 :         econtext->ecxt_outertuple = firstSlot;
    2830             : 
    2831      350370 :         prepare_projection_slot(aggstate,
    2832             :                                 econtext->ecxt_outertuple,
    2833             :                                 aggstate->current_set);
    2834             : 
    2835      350370 :         finalize_aggregates(aggstate, peragg, pergroup);
    2836             : 
    2837      350370 :         result = project_aggregates(aggstate);
    2838      350370 :         if (result)
    2839      306720 :             return result;
    2840             :     }
    2841             : 
    2842             :     /* No more groups */
    2843             :     return NULL;
    2844             : }
    2845             : 
    2846             : /*
    2847             :  * Initialize HashTapeInfo
    2848             :  */
    2849             : static void
    2850          40 : hashagg_tapeinfo_init(AggState *aggstate)
    2851             : {
    2852          40 :     HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
    2853          40 :     int         init_tapes = 16;    /* expanded dynamically */
    2854             : 
    2855          40 :     tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, NULL, NULL, -1);
    2856          40 :     tapeinfo->ntapes = init_tapes;
    2857          40 :     tapeinfo->nfreetapes = init_tapes;
    2858          40 :     tapeinfo->freetapes_alloc = init_tapes;
    2859          40 :     tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
    2860         680 :     for (int i = 0; i < init_tapes; i++)
    2861         640 :         tapeinfo->freetapes[i] = i;
    2862             : 
    2863          40 :     aggstate->hash_tapeinfo = tapeinfo;
    2864          40 : }
    2865             : 
    2866             : /*
    2867             :  * Assign unused tapes to spill partitions, extending the tape set if
    2868             :  * necessary.
    2869             :  */
    2870             : static void
    2871       10496 : hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
    2872             :                         int npartitions)
    2873             : {
    2874       10496 :     int         partidx = 0;
    2875             : 
    2876             :     /* use free tapes if available */
    2877       33636 :     while (partidx < npartitions && tapeinfo->nfreetapes > 0)
    2878       23140 :         partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
    2879             : 
    2880       10496 :     if (partidx < npartitions)
    2881             :     {
    2882        8948 :         LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
    2883             : 
    2884       27792 :         while (partidx < npartitions)
    2885       18844 :             partitions[partidx++] = tapeinfo->ntapes++;
    2886             :     }
    2887       10496 : }
    2888             : 
    2889             : /*
    2890             :  * After a tape has already been written to and then read, this function
    2891             :  * rewinds it for writing and adds it to the free list.
    2892             :  */
    2893             : static void
    2894       22988 : hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
    2895             : {
    2896       22988 :     LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
    2897       22988 :     if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
    2898             :     {
    2899           0 :         tapeinfo->freetapes_alloc <<= 1;
    2900           0 :         tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
    2901           0 :                                        tapeinfo->freetapes_alloc * sizeof(int));
    2902             :     }
    2903       22988 :     tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
    2904       22988 : }
    2905             : 
    2906             : /*
    2907             :  * hashagg_spill_init
    2908             :  *
    2909             :  * Called after we determined that spilling is necessary. Chooses the number
    2910             :  * of partitions to create, and initializes them.
    2911             :  */
    2912             : static void
    2913       10496 : hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
    2914             :                    uint64 input_groups, double hashentrysize)
    2915             : {
    2916             :     int         npartitions;
    2917             :     int         partition_bits;
    2918             : 
    2919       10496 :     npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
    2920             :                                              used_bits, &partition_bits);
    2921             : 
    2922       10496 :     spill->partitions = palloc0(sizeof(int) * npartitions);
    2923       10496 :     spill->ntuples = palloc0(sizeof(int64) * npartitions);
    2924             : 
    2925       10496 :     hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
    2926             : 
    2927       10496 :     spill->tapeset = tapeinfo->tapeset;
    2928       10496 :     spill->shift = 32 - used_bits - partition_bits;
    2929       10496 :     spill->mask = (npartitions - 1) << spill->shift;
    2930       10496 :     spill->npartitions = npartitions;
    2931       10496 : }
    2932             : 
    2933             : /*
    2934             :  * hashagg_spill_tuple
    2935             :  *
    2936             :  * No room for new groups in the hash table. Save for later in the appropriate
    2937             :  * partition.
    2938             :  */
    2939             : static Size
    2940      800532 : hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
    2941             : {
    2942      800532 :     LogicalTapeSet *tapeset = spill->tapeset;
    2943             :     int         partition;
    2944             :     MinimalTuple tuple;
    2945             :     int         tapenum;
    2946      800532 :     int         total_written = 0;
    2947             :     bool        shouldFree;
    2948             : 
    2949             :     Assert(spill->partitions != NULL);
    2950             : 
    2951             :     /* XXX: may contain unnecessary attributes, should project */
    2952      800532 :     tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
    2953             : 
    2954      800532 :     partition = (hash & spill->mask) >> spill->shift;
    2955      800532 :     spill->ntuples[partition]++;
    2956             : 
    2957      800532 :     tapenum = spill->partitions[partition];
    2958             : 
    2959      800532 :     LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
    2960      800532 :     total_written += sizeof(uint32);
    2961             : 
    2962      800532 :     LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
    2963      800532 :     total_written += tuple->t_len;
    2964             : 
    2965      800532 :     if (shouldFree)
    2966      233084 :         pfree(tuple);
    2967             : 
    2968      800532 :     return total_written;
    2969             : }
    2970             : 
    2971             : /*
    2972             :  * hashagg_batch_new
    2973             :  *
    2974             :  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
    2975             :  * be done.
    2976             :  */
    2977             : static HashAggBatch *
    2978       22988 : hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
    2979             :                   int64 input_tuples, int used_bits)
    2980             : {
    2981       22988 :     HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
    2982             : 
    2983       22988 :     batch->setno = setno;
    2984       22988 :     batch->used_bits = used_bits;
    2985       22988 :     batch->tapeset = tapeset;
    2986       22988 :     batch->input_tapenum = tapenum;
    2987       22988 :     batch->input_tuples = input_tuples;
    2988             : 
    2989       22988 :     return batch;
    2990             : }
    2991             : 
    2992             : /*
    2993             :  * read_spilled_tuple
    2994             :  *      read the next tuple from a batch's tape.  Return NULL if no more.
    2995             :  */
    2996             : static MinimalTuple
    2997      823520 : hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
    2998             : {
    2999      823520 :     LogicalTapeSet *tapeset = batch->tapeset;
    3000      823520 :     int         tapenum = batch->input_tapenum;
    3001             :     MinimalTuple tuple;
    3002             :     uint32      t_len;
    3003             :     size_t      nread;
    3004             :     uint32      hash;
    3005             : 
    3006      823520 :     nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
    3007      823520 :     if (nread == 0)
    3008       22988 :         return NULL;
    3009      800532 :     if (nread != sizeof(uint32))
    3010           0 :         ereport(ERROR,
    3011             :                 (errcode_for_file_access(),
    3012             :                  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
    3013             :                         tapenum, sizeof(uint32), nread)));
    3014      800532 :     if (hashp != NULL)
    3015      800532 :         *hashp = hash;
    3016             : 
    3017      800532 :     nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
    3018      800532 :     if (nread != sizeof(uint32))
    3019           0 :         ereport(ERROR,
    3020             :                 (errcode_for_file_access(),
    3021             :                  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
    3022             :                         tapenum, sizeof(uint32), nread)));
    3023             : 
    3024      800532 :     tuple = (MinimalTuple) palloc(t_len);
    3025      800532 :     tuple->t_len = t_len;
    3026             : 
    3027      800532 :     nread = LogicalTapeRead(tapeset, tapenum,
    3028             :                             (void *) ((char *) tuple + sizeof(uint32)),
    3029             :                             t_len - sizeof(uint32));
    3030      800532 :     if (nread != t_len - sizeof(uint32))
    3031           0 :         ereport(ERROR,
    3032             :                 (errcode_for_file_access(),
    3033             :                  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
    3034             :                         tapenum, t_len - sizeof(uint32), nread)));
    3035             : 
    3036      800532 :     return tuple;
    3037             : }
    3038             : 
    3039             : /*
    3040             :  * hashagg_finish_initial_spills
    3041             :  *
    3042             :  * After a HashAggBatch has been processed, it may have spilled tuples to
    3043             :  * disk. If so, turn the spilled partitions into new batches that must later
    3044             :  * be executed.
    3045             :  */
    3046             : static void
    3047       43638 : hashagg_finish_initial_spills(AggState *aggstate)
    3048             : {
    3049             :     int         setno;
    3050       43638 :     int         total_npartitions = 0;
    3051             : 
    3052       43638 :     if (aggstate->hash_spills != NULL)
    3053             :     {
    3054         104 :         for (setno = 0; setno < aggstate->num_hashes; setno++)
    3055             :         {
    3056          64 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    3057             : 
    3058          64 :             total_npartitions += spill->npartitions;
    3059          64 :             hashagg_spill_finish(aggstate, spill, setno);
    3060             :         }
    3061             : 
    3062             :         /*
    3063             :          * We're not processing tuples from outer plan any more; only
    3064             :          * processing batches of spilled tuples. The initial spill structures
    3065             :          * are no longer needed.
    3066             :          */
    3067          40 :         pfree(aggstate->hash_spills);
    3068          40 :         aggstate->hash_spills = NULL;
    3069             :     }
    3070             : 
    3071       43638 :     hash_agg_update_metrics(aggstate, false, total_npartitions);
    3072       43638 :     aggstate->hash_spill_mode = false;
    3073       43638 : }
    3074             : 
    3075             : /*
    3076             :  * hashagg_spill_finish
    3077             :  *
    3078             :  * Transform spill partitions into new batches.
    3079             :  */
    3080             : static void
    3081       10496 : hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
    3082             : {
    3083             :     int         i;
    3084       10496 :     int         used_bits = 32 - spill->shift;
    3085             : 
    3086       10496 :     if (spill->npartitions == 0)
    3087           0 :         return;                 /* didn't spill */
    3088             : 
    3089       52480 :     for (i = 0; i < spill->npartitions; i++)
    3090             :     {
    3091       41984 :         int         tapenum = spill->partitions[i];
    3092             :         HashAggBatch *new_batch;
    3093             : 
    3094             :         /* if the partition is empty, don't create a new batch of work */
    3095       41984 :         if (spill->ntuples[i] == 0)
    3096       18996 :             continue;
    3097             : 
    3098       22988 :         new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
    3099       22988 :                                       tapenum, setno, spill->ntuples[i],
    3100             :                                       used_bits);
    3101       22988 :         aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
    3102       22988 :         aggstate->hash_batches_used++;
    3103             :     }
    3104             : 
    3105       10496 :     pfree(spill->ntuples);
    3106       10496 :     pfree(spill->partitions);
    3107             : }
    3108             : 
    3109             : /*
    3110             :  * Free resources related to a spilled HashAgg.
    3111             :  */
    3112             : static void
    3113       71660 : hashagg_reset_spill_state(AggState *aggstate)
    3114             : {
    3115             :     ListCell   *lc;
    3116             : 
    3117             :     /* free spills from initial pass */
    3118       71660 :     if (aggstate->hash_spills != NULL)
    3119             :     {
    3120             :         int         setno;
    3121             : 
    3122           0 :         for (setno = 0; setno < aggstate->num_hashes; setno++)
    3123             :         {
    3124           0 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    3125             : 
    3126           0 :             pfree(spill->ntuples);
    3127           0 :             pfree(spill->partitions);
    3128             :         }
    3129           0 :         pfree(aggstate->hash_spills);
    3130           0 :         aggstate->hash_spills = NULL;
    3131             :     }
    3132             : 
    3133             :     /* free batches */
    3134       71660 :     foreach(lc, aggstate->hash_batches)
    3135             :     {
    3136           0 :         HashAggBatch *batch = (HashAggBatch *) lfirst(lc);
    3137             : 
    3138           0 :         pfree(batch);
    3139             :     }
    3140       71660 :     list_free(aggstate->hash_batches);
    3141       71660 :     aggstate->hash_batches = NIL;
    3142             : 
    3143             :     /* close tape set */
    3144       71660 :     if (aggstate->hash_tapeinfo != NULL)
    3145             :     {
    3146          40 :         HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
    3147             : 
    3148          40 :         LogicalTapeSetClose(tapeinfo->tapeset);
    3149          40 :         pfree(tapeinfo->freetapes);
    3150          40 :         pfree(tapeinfo);
    3151          40 :         aggstate->hash_tapeinfo = NULL;
    3152             :     }
    3153       71660 : }
    3154             : 
    3155             : 
    3156             : /* -----------------
    3157             :  * ExecInitAgg
    3158             :  *
    3159             :  *  Creates the run-time information for the agg node produced by the
    3160             :  *  planner and initializes its outer subtree.
    3161             :  *
    3162             :  * -----------------
    3163             :  */
    3164             : AggState *
    3165       30144 : ExecInitAgg(Agg *node, EState *estate, int eflags)
    3166             : {
    3167             :     AggState   *aggstate;
    3168             :     AggStatePerAgg peraggs;
    3169             :     AggStatePerTrans pertransstates;
    3170             :     AggStatePerGroup *pergroups;
    3171             :     Plan       *outerPlan;
    3172             :     ExprContext *econtext;
    3173             :     TupleDesc   scanDesc;
    3174             :     int         numaggs,
    3175             :                 transno,
    3176             :                 aggno;
    3177             :     int         phase;
    3178             :     int         phaseidx;
    3179             :     ListCell   *l;
    3180       30144 :     Bitmapset  *all_grouped_cols = NULL;
    3181       30144 :     int         numGroupingSets = 1;
    3182             :     int         numPhases;
    3183             :     int         numHashes;
    3184       30144 :     int         i = 0;
    3185       30144 :     int         j = 0;
    3186       57432 :     bool        use_hashing = (node->aggstrategy == AGG_HASHED ||
    3187       27288 :                                node->aggstrategy == AGG_MIXED);
    3188             : 
    3189             :     /* check for unsupported flags */
    3190             :     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
    3191             : 
    3192             :     /*
    3193             :      * create state structure
    3194             :      */
    3195       30144 :     aggstate = makeNode(AggState);
    3196       30144 :     aggstate->ss.ps.plan = (Plan *) node;
    3197       30144 :     aggstate->ss.ps.state = estate;
    3198       30144 :     aggstate->ss.ps.ExecProcNode = ExecAgg;
    3199             : 
    3200       30144 :     aggstate->aggs = NIL;
    3201       30144 :     aggstate->numaggs = 0;
    3202       30144 :     aggstate->numtrans = 0;
    3203       30144 :     aggstate->aggstrategy = node->aggstrategy;
    3204       30144 :     aggstate->aggsplit = node->aggsplit;
    3205       30144 :     aggstate->maxsets = 0;
    3206       30144 :     aggstate->projected_set = -1;
    3207       30144 :     aggstate->current_set = 0;
    3208       30144 :     aggstate->peragg = NULL;
    3209       30144 :     aggstate->pertrans = NULL;
    3210       30144 :     aggstate->curperagg = NULL;
    3211       30144 :     aggstate->curpertrans = NULL;
    3212       30144 :     aggstate->input_done = false;
    3213       30144 :     aggstate->agg_done = false;
    3214       30144 :     aggstate->pergroups = NULL;
    3215       30144 :     aggstate->grp_firstTuple = NULL;
    3216       30144 :     aggstate->sort_in = NULL;
    3217       30144 :     aggstate->sort_out = NULL;
    3218             : 
    3219             :     /*
    3220             :      * phases[0] always exists, but is dummy in sorted/plain mode
    3221             :      */
    3222       30144 :     numPhases = (use_hashing ? 1 : 2);
    3223       30144 :     numHashes = (use_hashing ? 1 : 0);
    3224             : 
    3225             :     /*
    3226             :      * Calculate the maximum number of grouping sets in any phase; this
    3227             :      * determines the size of some allocations.  Also calculate the number of
    3228             :      * phases, since all hashed/mixed nodes contribute to only a single phase.
    3229             :      */
    3230       30144 :     if (node->groupingSets)
    3231             :     {
    3232         440 :         numGroupingSets = list_length(node->groupingSets);
    3233             : 
    3234         908 :         foreach(l, node->chain)
    3235             :         {
    3236         468 :             Agg        *agg = lfirst(l);
    3237             : 
    3238         468 :             numGroupingSets = Max(numGroupingSets,
    3239             :                                   list_length(agg->groupingSets));
    3240             : 
    3241             :             /*
    3242             :              * additional AGG_HASHED aggs become part of phase 0, but all
    3243             :              * others add an extra phase.
    3244             :              */
    3245         468 :             if (agg->aggstrategy != AGG_HASHED)
    3246         244 :                 ++numPhases;
    3247             :             else
    3248         224 :                 ++numHashes;
    3249             :         }
    3250             :     }
    3251             : 
    3252       30144 :     aggstate->maxsets = numGroupingSets;
    3253       30144 :     aggstate->numphases = numPhases;
    3254             : 
    3255       30144 :     aggstate->aggcontexts = (ExprContext **)
    3256       30144 :         palloc0(sizeof(ExprContext *) * numGroupingSets);
    3257             : 
    3258             :     /*
    3259             :      * Create expression contexts.  We need three or more, one for
    3260             :      * per-input-tuple processing, one for per-output-tuple processing, one
    3261             :      * for all the hashtables, and one for each grouping set.  The per-tuple
    3262             :      * memory context of the per-grouping-set ExprContexts (aggcontexts)
    3263             :      * replaces the standalone memory context formerly used to hold transition
    3264             :      * values.  We cheat a little by using ExecAssignExprContext() to build
    3265             :      * all of them.
    3266             :      *
    3267             :      * NOTE: the details of what is stored in aggcontexts and what is stored
    3268             :      * in the regular per-query memory context are driven by a simple
    3269             :      * decision: we want to reset the aggcontext at group boundaries (if not
    3270             :      * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
    3271             :      */
    3272       30144 :     ExecAssignExprContext(estate, &aggstate->ss.ps);
    3273       30144 :     aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
    3274             : 
    3275       60760 :     for (i = 0; i < numGroupingSets; ++i)
    3276             :     {
    3277       30616 :         ExecAssignExprContext(estate, &aggstate->ss.ps);
    3278       30616 :         aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
    3279             :     }
    3280             : 
    3281       30144 :     if (use_hashing)
    3282        2956 :         aggstate->hashcontext = CreateWorkExprContext(estate);
    3283             : 
    3284       30144 :     ExecAssignExprContext(estate, &aggstate->ss.ps);
    3285             : 
    3286             :     /*
    3287             :      * Initialize child nodes.
    3288             :      *
    3289             :      * If we are doing a hashed aggregation then the child plan does not need
    3290             :      * to handle REWIND efficiently; see ExecReScanAgg.
    3291             :      */
    3292       30144 :     if (node->aggstrategy == AGG_HASHED)
    3293        2856 :         eflags &= ~EXEC_FLAG_REWIND;
    3294       30144 :     outerPlan = outerPlan(node);
    3295       30144 :     outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
    3296             : 
    3297             :     /*
    3298             :      * initialize source tuple type.
    3299             :      */
    3300       30144 :     aggstate->ss.ps.outerops =
    3301       30144 :         ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
    3302             :                              &aggstate->ss.ps.outeropsfixed);
    3303       30144 :     aggstate->ss.ps.outeropsset = true;
    3304             : 
    3305       30144 :     ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
    3306             :                                     aggstate->ss.ps.outerops);
    3307       30144 :     scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
    3308             : 
    3309             :     /*
    3310             :      * If there are more than two phases (including a potential dummy phase
    3311             :      * 0), input will be resorted using tuplesort. Need a slot for that.
    3312             :      */
    3313       30144 :     if (numPhases > 2)
    3314             :     {
    3315         116 :         aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
    3316             :                                                      &TTSOpsMinimalTuple);
    3317             : 
    3318             :         /*
    3319             :          * The output of the tuplesort, and the output from the outer child
    3320             :          * might not use the same type of slot. In most cases the child will
    3321             :          * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
    3322             :          * input can also be presorted due an index, in which case it could be
    3323             :          * a different type of slot.
    3324             :          *
    3325             :          * XXX: For efficiency it would be good to instead/additionally
    3326             :          * generate expressions with corresponding settings of outerops* for
    3327             :          * the individual phases - deforming is often a bottleneck for
    3328             :          * aggregations with lots of rows per group. If there's multiple
    3329             :          * sorts, we know that all but the first use TTSOpsMinimalTuple (via
    3330             :          * the nodeAgg.c internal tuplesort).
    3331             :          */
    3332         116 :         if (aggstate->ss.ps.outeropsfixed &&
    3333         116 :             aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
    3334           8 :             aggstate->ss.ps.outeropsfixed = false;
    3335             :     }
    3336             : 
    3337             :     /*
    3338             :      * Initialize result type, slot and projection.
    3339             :      */
    3340       30144 :     ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
    3341       30144 :     ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
    3342             : 
    3343             :     /*
    3344             :      * initialize child expressions
    3345             :      *
    3346             :      * We expect the parser to have checked that no aggs contain other agg
    3347             :      * calls in their arguments (and just to be sure, we verify it again while
    3348             :      * initializing the plan node).  This would make no sense under SQL
    3349             :      * semantics, and it's forbidden by the spec.  Because it is true, we
    3350             :      * don't need to worry about evaluating the aggs in any particular order.
    3351             :      *
    3352             :      * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
    3353             :      * nodes to aggstate->aggs.  Aggrefs in the qual are found here; Aggrefs
    3354             :      * in the targetlist are found during ExecAssignProjectionInfo, below.
    3355             :      */
    3356       30144 :     aggstate->ss.ps.qual =
    3357       30144 :         ExecInitQual(node->plan.qual, (PlanState *) aggstate);
    3358             : 
    3359             :     /*
    3360             :      * We should now have found all Aggrefs in the targetlist and quals.
    3361             :      */
    3362       30144 :     numaggs = aggstate->numaggs;
    3363             :     Assert(numaggs == list_length(aggstate->aggs));
    3364             : 
    3365             :     /*
    3366             :      * For each phase, prepare grouping set data and fmgr lookup data for
    3367             :      * compare functions.  Accumulate all_grouped_cols in passing.
    3368             :      */
    3369       30144 :     aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
    3370             : 
    3371       30144 :     aggstate->num_hashes = numHashes;
    3372       30144 :     if (numHashes)
    3373             :     {
    3374        2956 :         aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
    3375        2956 :         aggstate->phases[0].numsets = 0;
    3376        2956 :         aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
    3377        2956 :         aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
    3378             :     }
    3379             : 
    3380       30144 :     phase = 0;
    3381       60756 :     for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
    3382             :     {
    3383             :         Agg        *aggnode;
    3384             :         Sort       *sortnode;
    3385             : 
    3386       30612 :         if (phaseidx > 0)
    3387             :         {
    3388         468 :             aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
    3389         468 :             sortnode = castNode(Sort, aggnode->plan.lefttree);
    3390             :         }
    3391             :         else
    3392             :         {
    3393       30144 :             aggnode = node;
    3394       30144 :             sortnode = NULL;
    3395             :         }
    3396             : 
    3397             :         Assert(phase <= 1 || sortnode);
    3398             : 
    3399       30612 :         if (aggnode->aggstrategy == AGG_HASHED
    3400       27532 :             || aggnode->aggstrategy == AGG_MIXED)
    3401             :         {
    3402        3180 :             AggStatePerPhase phasedata = &aggstate->phases[0];
    3403             :             AggStatePerHash perhash;
    3404        3180 :             Bitmapset  *cols = NULL;
    3405             : 
    3406             :             Assert(phase == 0);
    3407        3180 :             i = phasedata->numsets++;
    3408        3180 :             perhash = &aggstate->perhash[i];
    3409             : 
    3410             :             /* phase 0 always points to the "real" Agg in the hash case */
    3411        3180 :             phasedata->aggnode = node;
    3412        3180 :             phasedata->aggstrategy = node->aggstrategy;
    3413             : 
    3414             :             /* but the actual Agg node representing this hash is saved here */
    3415        3180 :             perhash->aggnode = aggnode;
    3416             : 
    3417        3180 :             phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
    3418             : 
    3419        7400 :             for (j = 0; j < aggnode->numCols; ++j)
    3420        4220 :                 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
    3421             : 
    3422        3180 :             phasedata->grouped_cols[i] = cols;
    3423             : 
    3424        3180 :             all_grouped_cols = bms_add_members(all_grouped_cols, cols);
    3425        3180 :             continue;
    3426             :         }
    3427             :         else
    3428             :         {
    3429       27432 :             AggStatePerPhase phasedata = &aggstate->phases[++phase];
    3430             :             int         num_sets;
    3431             : 
    3432       27432 :             phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
    3433             : 
    3434       27432 :             if (num_sets)
    3435             :             {
    3436         528 :                 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
    3437         528 :                 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
    3438             : 
    3439         528 :                 i = 0;
    3440        1576 :                 foreach(l, aggnode->groupingSets)
    3441             :                 {
    3442        1048 :                     int         current_length = list_length(lfirst(l));
    3443        1048 :                     Bitmapset  *cols = NULL;
    3444             : 
    3445             :                     /* planner forces this to be correct */
    3446        2056 :                     for (j = 0; j < current_length; ++j)
    3447        1008 :                         cols = bms_add_member(cols, aggnode->grpColIdx[j]);
    3448             : 
    3449        1048 :                     phasedata->grouped_cols[i] = cols;
    3450        1048 :                     phasedata->gset_lengths[i] = current_length;
    3451             : 
    3452        1048 :                     ++i;
    3453             :                 }
    3454             : 
    3455         528 :                 all_grouped_cols = bms_add_members(all_grouped_cols,
    3456         528 :                                                    phasedata->grouped_cols[0]);
    3457             :             }
    3458             :             else
    3459             :             {
    3460             :                 Assert(phaseidx == 0);
    3461             : 
    3462       26904 :                 phasedata->gset_lengths = NULL;
    3463       26904 :                 phasedata->grouped_cols = NULL;
    3464             :             }
    3465             : 
    3466             :             /*
    3467             :              * If we are grouping, precompute fmgr lookup data for inner loop.
    3468             :              */
    3469       27432 :             if (aggnode->aggstrategy == AGG_SORTED)
    3470             :             {
    3471        1372 :                 int         i = 0;
    3472             : 
    3473             :                 Assert(aggnode->numCols > 0);
    3474             : 
    3475             :                 /*
    3476             :                  * Build a separate function for each subset of columns that
    3477             :                  * need to be compared.
    3478             :                  */
    3479        1372 :                 phasedata->eqfunctions =
    3480        1372 :                     (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
    3481             : 
    3482             :                 /* for each grouping set */
    3483        2276 :                 for (i = 0; i < phasedata->numsets; i++)
    3484             :                 {
    3485         904 :                     int         length = phasedata->gset_lengths[i];
    3486             : 
    3487         904 :                     if (phasedata->eqfunctions[length - 1] != NULL)
    3488         256 :                         continue;
    3489             : 
    3490         648 :                     phasedata->eqfunctions[length - 1] =
    3491         648 :                         execTuplesMatchPrepare(scanDesc,
    3492             :                                                length,
    3493         648 :                                                aggnode->grpColIdx,
    3494         648 :                                                aggnode->grpOperators,
    3495         648 :                                                aggnode->grpCollations,
    3496             :                                                (PlanState *) aggstate);
    3497             :                 }
    3498             : 
    3499             :                 /* and for all grouped columns, unless already computed */
    3500        1372 :                 if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
    3501             :                 {
    3502         940 :                     phasedata->eqfunctions[aggnode->numCols - 1] =
    3503        1880 :                         execTuplesMatchPrepare(scanDesc,
    3504             :                                                aggnode->numCols,
    3505         940 :                                                aggnode->grpColIdx,
    3506         940 :                                                aggnode->grpOperators,
    3507         940 :                                                aggnode->grpCollations,
    3508             :                                                (PlanState *) aggstate);
    3509             :                 }
    3510             :             }
    3511             : 
    3512       27432 :             phasedata->aggnode = aggnode;
    3513       27432 :             phasedata->aggstrategy = aggnode->aggstrategy;
    3514       27432 :             phasedata->sortnode = sortnode;
    3515             :         }
    3516             :     }
    3517             : 
    3518             :     /*
    3519             :      * Convert all_grouped_cols to a descending-order list.
    3520             :      */
    3521       30144 :     i = -1;
    3522       34768 :     while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
    3523        4624 :         aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
    3524             : 
    3525             :     /*
    3526             :      * Set up aggregate-result storage in the output expr context, and also
    3527             :      * allocate my private per-agg working storage
    3528             :      */
    3529       30144 :     econtext = aggstate->ss.ps.ps_ExprContext;
    3530       30144 :     econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
    3531       30144 :     econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
    3532             : 
    3533       30144 :     peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
    3534       30144 :     pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
    3535             : 
    3536       30144 :     aggstate->peragg = peraggs;
    3537       30144 :     aggstate->pertrans = pertransstates;
    3538             : 
    3539             : 
    3540       30144 :     aggstate->all_pergroups =
    3541       30144 :         (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
    3542       30144 :                                      * (numGroupingSets + numHashes));
    3543       30144 :     pergroups = aggstate->all_pergroups;
    3544             : 
    3545       30144 :     if (node->aggstrategy != AGG_HASHED)
    3546             :     {
    3547       55048 :         for (i = 0; i < numGroupingSets; i++)
    3548             :         {
    3549       27760 :             pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
    3550             :                                                       * numaggs);
    3551             :         }
    3552             : 
    3553       27288 :         aggstate->pergroups = pergroups;
    3554       27288 :         pergroups += numGroupingSets;
    3555             :     }
    3556             : 
    3557             :     /*
    3558             :      * Hashing can only appear in the initial phase.
    3559             :      */
    3560       30144 :     if (use_hashing)
    3561             :     {
    3562        2956 :         Plan       *outerplan = outerPlan(node);
    3563        2956 :         uint64      totalGroups = 0;
    3564             :         int         i;
    3565             : 
    3566        2956 :         aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
    3567             :                                                        "HashAgg meta context",
    3568             :                                                        ALLOCSET_DEFAULT_SIZES);
    3569        2956 :         aggstate->hash_spill_slot = ExecInitExtraTupleSlot(estate, scanDesc,
    3570             :                                                            &TTSOpsMinimalTuple);
    3571             : 
    3572             :         /* this is an array of pointers, not structures */
    3573        2956 :         aggstate->hash_pergroup = pergroups;
    3574             : 
    3575        8868 :         aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
    3576        2956 :                                                       outerplan->plan_width,
    3577             :                                                       node->transitionSpace);
    3578             : 
    3579             :         /*
    3580             :          * Consider all of the grouping sets together when setting the limits
    3581             :          * and estimating the number of partitions. This can be inaccurate
    3582             :          * when there is more than one grouping set, but should still be
    3583             :          * reasonable.
    3584             :          */
    3585        6136 :         for (i = 0; i < aggstate->num_hashes; i++)
    3586        3180 :             totalGroups += aggstate->perhash[i].aggnode->numGroups;
    3587             : 
    3588        2956 :         hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
    3589             :                             &aggstate->hash_mem_limit,
    3590             :                             &aggstate->hash_ngroups_limit,
    3591             :                             &aggstate->hash_planned_partitions);
    3592        2956 :         find_hash_columns(aggstate);
    3593        2956 :         build_hash_tables(aggstate);
    3594        2956 :         aggstate->table_filled = false;
    3595             :     }
    3596             : 
    3597             :     /*
    3598             :      * Initialize current phase-dependent values to initial phase. The initial
    3599             :      * phase is 1 (first sort pass) for all strategies that use sorting (if
    3600             :      * hashing is being done too, then phase 0 is processed last); but if only
    3601             :      * hashing is being done, then phase 0 is all there is.
    3602             :      */
    3603       30144 :     if (node->aggstrategy == AGG_HASHED)
    3604             :     {
    3605        2856 :         aggstate->current_phase = 0;
    3606        2856 :         initialize_phase(aggstate, 0);
    3607        2856 :         select_current_set(aggstate, 0, true);
    3608             :     }
    3609             :     else
    3610             :     {
    3611       27288 :         aggstate->current_phase = 1;
    3612       27288 :         initialize_phase(aggstate, 1);
    3613       27288 :         select_current_set(aggstate, 0, false);
    3614             :     }
    3615             : 
    3616             :     /* -----------------
    3617             :      * Perform lookups of aggregate function info, and initialize the
    3618             :      * unchanging fields of the per-agg and per-trans data.
    3619             :      *
    3620             :      * We try to optimize by detecting duplicate aggregate functions so that
    3621             :      * their state and final values are re-used, rather than needlessly being
    3622             :      * re-calculated independently. We also detect aggregates that are not
    3623             :      * the same, but which can share the same transition state.
    3624             :      *
    3625             :      * Scenarios:
    3626             :      *
    3627             :      * 1. Identical aggregate function calls appear in the query:
    3628             :      *
    3629             :      *    SELECT SUM(x) FROM ... HAVING SUM(x) > 0
    3630             :      *
    3631             :      *    Since these aggregates are identical, we only need to calculate
    3632             :      *    the value once. Both aggregates will share the same 'aggno' value.
    3633             :      *
    3634             :      * 2. Two different aggregate functions appear in the query, but the
    3635             :      *    aggregates have the same arguments, transition functions and
    3636             :      *    initial values (and, presumably, different final functions):
    3637             :      *
    3638             :      *    SELECT AVG(x), STDDEV(x) FROM ...
    3639             :      *
    3640             :      *    In this case we must create a new peragg for the varying aggregate,
    3641             :      *    and we need to call the final functions separately, but we need
    3642             :      *    only run the transition function once.  (This requires that the
    3643             :      *    final functions be nondestructive of the transition state, but
    3644             :      *    that's required anyway for other reasons.)
    3645             :      *
    3646             :      * For either of these optimizations to be valid, all aggregate properties
    3647             :      * used in the transition phase must be the same, including any modifiers
    3648             :      * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
    3649             :      * contain any volatile functions.
    3650             :      * -----------------
    3651             :      */
    3652       30144 :     aggno = -1;
    3653       30144 :     transno = -1;
    3654       63106 :     foreach(l, aggstate->aggs)
    3655             :     {
    3656       32966 :         AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
    3657       32966 :         Aggref     *aggref = aggrefstate->aggref;
    3658             :         AggStatePerAgg peragg;
    3659             :         AggStatePerTrans pertrans;
    3660             :         int         existing_aggno;
    3661             :         int         existing_transno;
    3662             :         List       *same_input_transnos;
    3663             :         Oid         inputTypes[FUNC_MAX_ARGS];
    3664             :         int         numArguments;
    3665             :         int         numDirectArgs;
    3666             :         HeapTuple   aggTuple;
    3667             :         Form_pg_aggregate aggform;
    3668             :         AclResult   aclresult;
    3669             :         Oid         transfn_oid,
    3670             :                     finalfn_oid;
    3671             :         bool        shareable;
    3672             :         Oid         serialfn_oid,
    3673             :                     deserialfn_oid;
    3674             :         Expr       *finalfnexpr;
    3675             :         Oid         aggtranstype;
    3676             :         Datum       textInitVal;
    3677             :         Datum       initValue;
    3678             :         bool        initValueIsNull;
    3679             : 
    3680             :         /* Planner should have assigned aggregate to correct level */
    3681             :         Assert(aggref->agglevelsup == 0);
    3682             :         /* ... and the split mode should match */
    3683             :         Assert(aggref->aggsplit == aggstate->aggsplit);
    3684             : 
    3685             :         /* 1. Check for already processed aggs which can be re-used */
    3686       32966 :         existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
    3687             :                                                 &same_input_transnos);
    3688       32966 :         if (existing_aggno != -1)
    3689             :         {
    3690             :             /*
    3691             :              * Existing compatible agg found. so just point the Aggref to the
    3692             :              * same per-agg struct.
    3693             :              */
    3694         316 :             aggrefstate->aggno = existing_aggno;
    3695         316 :             continue;
    3696             :         }
    3697             : 
    3698             :         /* Mark Aggref state node with assigned index in the result array */
    3699       32650 :         peragg = &peraggs[++aggno];
    3700       32650 :         peragg->aggref = aggref;
    3701       32650 :         aggrefstate->aggno = aggno;
    3702             : 
    3703             :         /* Fetch the pg_aggregate row */
    3704       32650 :         aggTuple = SearchSysCache1(AGGFNOID,
    3705       32650 :                                    ObjectIdGetDatum(aggref->aggfnoid));
    3706       32650 :         if (!HeapTupleIsValid(aggTuple))
    3707           0 :             elog(ERROR, "cache lookup failed for aggregate %u",
    3708             :                  aggref->aggfnoid);
    3709       32650 :         aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
    3710             : 
    3711             :         /* Check permission to call aggregate function */
    3712       32650 :         aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
    3713             :                                      ACL_EXECUTE);
    3714       32650 :         if (aclresult != ACLCHECK_OK)
    3715           4 :             aclcheck_error(aclresult, OBJECT_AGGREGATE,
    3716           4 :                            get_func_name(aggref->aggfnoid));
    3717       32646 :         InvokeFunctionExecuteHook(aggref->aggfnoid);
    3718             : 
    3719             :         /* planner recorded transition state type in the Aggref itself */
    3720       32646 :         aggtranstype = aggref->aggtranstype;
    3721             :         Assert(OidIsValid(aggtranstype));
    3722             : 
    3723             :         /*
    3724             :          * If this aggregation is performing state combines, then instead of
    3725             :          * using the transition function, we'll use the combine function
    3726             :          */
    3727       32646 :         if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
    3728             :         {
    3729         780 :             transfn_oid = aggform->aggcombinefn;
    3730             : 
    3731             :             /* If not set then the planner messed up */
    3732         780 :             if (!OidIsValid(transfn_oid))
    3733           0 :                 elog(ERROR, "combinefn not set for aggregate function");
    3734             :         }
    3735             :         else
    3736       31866 :             transfn_oid = aggform->aggtransfn;
    3737             : 
    3738             :         /* Final function only required if we're finalizing the aggregates */
    3739       32646 :         if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
    3740        2538 :             peragg->finalfn_oid = finalfn_oid = InvalidOid;
    3741             :         else
    3742       30108 :             peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
    3743             : 
    3744             :         /*
    3745             :          * If finalfn is marked read-write, we can't share transition states;
    3746             :          * but it is okay to share states for AGGMODIFY_SHAREABLE aggs.  Also,
    3747             :          * if we're not executing the finalfn here, we can share regardless.
    3748             :          */
    3749       32646 :         shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
    3750             :             (finalfn_oid == InvalidOid);
    3751       32646 :         peragg->shareable = shareable;
    3752             : 
    3753       32646 :         serialfn_oid = InvalidOid;
    3754       32646 :         deserialfn_oid = InvalidOid;
    3755             : 
    3756             :         /*
    3757             :          * Check if serialization/deserialization is required.  We only do it
    3758             :          * for aggregates that have transtype INTERNAL.
    3759             :          */
    3760       32646 :         if (aggtranstype == INTERNALOID)
    3761             :         {
    3762             :             /*
    3763             :              * The planner should only have generated a serialize agg node if
    3764             :              * every aggregate with an INTERNAL state has a serialization
    3765             :              * function.  Verify that.
    3766             :              */
    3767       16300 :             if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
    3768             :             {
    3769             :                 /* serialization only valid when not running finalfn */
    3770             :                 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
    3771             : 
    3772          72 :                 if (!OidIsValid(aggform->aggserialfn))
    3773           0 :                     elog(ERROR, "serialfunc not provided for serialization aggregation");
    3774          72 :                 serialfn_oid = aggform->aggserialfn;
    3775             :             }
    3776             : 
    3777             :             /* Likewise for deserialization functions */
    3778       16300 :             if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
    3779             :             {
    3780             :                 /* deserialization only valid when combining states */
    3781             :                 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
    3782             : 
    3783          24 :                 if (!OidIsValid(aggform->aggdeserialfn))
    3784           0 :                     elog(ERROR, "deserialfunc not provided for deserialization aggregation");
    3785          24 :                 deserialfn_oid = aggform->aggdeserialfn;
    3786             :             }
    3787             :         }
    3788             : 
    3789             :         /* Check that aggregate owner has permission to call component fns */
    3790             :         {
    3791             :             HeapTuple   procTuple;
    3792             :             Oid         aggOwner;
    3793             : 
    3794       32646 :             procTuple = SearchSysCache1(PROCOID,
    3795       32646 :                                         ObjectIdGetDatum(aggref->aggfnoid));
    3796       32646 :             if (!HeapTupleIsValid(procTuple))
    3797           0 :                 elog(ERROR, "cache lookup failed for function %u",
    3798             :                      aggref->aggfnoid);
    3799       32646 :             aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
    3800       32646 :             ReleaseSysCache(procTuple);
    3801             : 
    3802       32646 :             aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
    3803             :                                          ACL_EXECUTE);
    3804       32646 :             if (aclresult != ACLCHECK_OK)
    3805           0 :                 aclcheck_error(aclresult, OBJECT_FUNCTION,
    3806           0 :                                get_func_name(transfn_oid));
    3807       32646 :             InvokeFunctionExecuteHook(transfn_oid);
    3808       32646 :             if (OidIsValid(finalfn_oid))
    3809             :             {
    3810       17246 :                 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
    3811             :                                              ACL_EXECUTE);
    3812       17246 :                 if (aclresult != ACLCHECK_OK)
    3813           0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3814           0 :                                    get_func_name(finalfn_oid));
    3815       17246 :                 InvokeFunctionExecuteHook(finalfn_oid);
    3816             :             }
    3817       32646 :             if (OidIsValid(serialfn_oid))
    3818             :             {
    3819          72 :                 aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
    3820             :                                              ACL_EXECUTE);
    3821          72 :                 if (aclresult != ACLCHECK_OK)
    3822           0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3823           0 :                                    get_func_name(serialfn_oid));
    3824          72 :                 InvokeFunctionExecuteHook(serialfn_oid);
    3825             :             }
    3826       32646 :             if (OidIsValid(deserialfn_oid))
    3827             :             {
    3828          24 :                 aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
    3829             :                                              ACL_EXECUTE);
    3830          24 :                 if (aclresult != ACLCHECK_OK)
    3831           0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3832           0 :                                    get_func_name(deserialfn_oid));
    3833          24 :                 InvokeFunctionExecuteHook(deserialfn_oid);
    3834             :             }
    3835             :         }
    3836             : 
    3837             :         /*
    3838             :          * Get actual datatypes of the (nominal) aggregate inputs.  These
    3839             :          * could be different from the agg's declared input types, when the
    3840             :          * agg accepts ANY or a polymorphic type.
    3841             :          */
    3842       32646 :         numArguments = get_aggregate_argtypes(aggref, inputTypes);
    3843             : 
    3844             :         /* Count the "direct" arguments, if any */
    3845       32646 :         numDirectArgs = list_length(aggref->aggdirectargs);
    3846             : 
    3847             :         /* Detect how many arguments to pass to the finalfn */
    3848       32646 :         if (aggform->aggfinalextra)
    3849       14064 :             peragg->numFinalArgs = numArguments + 1;
    3850             :         else
    3851       18582 :             peragg->numFinalArgs = numDirectArgs + 1;
    3852             : 
    3853             :         /* Initialize any direct-argument expressions */
    3854       32646 :         peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
    3855             :                                                  (PlanState *) aggstate);
    3856             : 
    3857             :         /*
    3858             :          * build expression trees using actual argument & result types for the
    3859             :          * finalfn, if it exists and is required.
    3860             :          */
    3861       32646 :         if (OidIsValid(finalfn_oid))
    3862             :         {
    3863       17246 :             build_aggregate_finalfn_expr(inputTypes,
    3864             :                                          peragg->numFinalArgs,
    3865             :                                          aggtranstype,
    3866             :                                          aggref->aggtype,
    3867             :                                          aggref->inputcollid,
    3868             :                                          finalfn_oid,
    3869             :                                          &finalfnexpr);
    3870       17246 :             fmgr_info(finalfn_oid, &peragg->finalfn);
    3871       17246 :             fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
    3872             :         }
    3873             : 
    3874             :         /* get info about the output value's datatype */
    3875       32646 :         get_typlenbyval(aggref->aggtype,
    3876             :                         &peragg->resulttypeLen,
    3877             :                         &peragg->resulttypeByVal);
    3878             : 
    3879             :         /*
    3880             :          * initval is potentially null, so don't try to access it as a struct
    3881             :          * field. Must do it the hard way with SysCacheGetAttr.
    3882             :          */
    3883       32646 :         textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
    3884             :                                       Anum_pg_aggregate_agginitval,
    3885             :                                       &initValueIsNull);
    3886       32646 :         if (initValueIsNull)
    3887       21982 :             initValue = (Datum) 0;
    3888             :         else
    3889       10664 :             initValue = GetAggInitVal(textInitVal, aggtranstype);
    3890             : 
    3891             :         /*
    3892             :          * 2. Build working state for invoking the transition function, or
    3893             :          * look up previously initialized working state, if we can share it.
    3894             :          *
    3895             :          * find_compatible_peragg() already collected a list of shareable
    3896             :          * per-Trans's with the same inputs. Check if any of them have the
    3897             :          * same transition function and initial value.
    3898             :          */
    3899       32646 :         existing_transno = find_compatible_pertrans(aggstate, aggref,
    3900             :                                                     shareable,
    3901             :                                                     transfn_oid, aggtranstype,
    3902             :                                                     serialfn_oid, deserialfn_oid,
    3903             :                                                     initValue, initValueIsNull,
    3904             :                                                     same_input_transnos);
    3905       32646 :         if (existing_transno != -1)
    3906             :         {
    3907             :             /*
    3908             :              * Existing compatible trans found, so just point the 'peragg' to
    3909             :              * the same per-trans struct, and mark the trans state as shared.
    3910             :              */
    3911         100 :             pertrans = &pertransstates[existing_transno];
    3912         100 :             pertrans->aggshared = true;
    3913         100 :             peragg->transno = existing_transno;
    3914             :         }
    3915             :         else
    3916             :         {
    3917       32546 :             pertrans = &pertransstates[++transno];
    3918       32546 :             build_pertrans_for_aggref(pertrans, aggstate, estate,
    3919             :                                       aggref, transfn_oid, aggtranstype,
    3920             :                                       serialfn_oid, deserialfn_oid,
    3921             :                                       initValue, initValueIsNull,
    3922             :                                       inputTypes, numArguments);
    3923       32546 :             peragg->transno = transno;
    3924             :         }
    3925       32646 :         ReleaseSysCache(aggTuple);
    3926             :     }
    3927             : 
    3928             :     /*
    3929             :      * Update aggstate->numaggs to be the number of unique aggregates found.
    3930             :      * Also set numstates to the number of unique transition states found.
    3931             :      */
    3932       30140 :     aggstate->numaggs = aggno + 1;
    3933       30140 :     aggstate->numtrans = transno + 1;
    3934             : 
    3935             :     /*
    3936             :      * Last, check whether any more aggregates got added onto the node while
    3937             :      * we processed the expressions for the aggregate arguments (including not
    3938             :      * only the regular arguments and FILTER expressions handled immediately
    3939             :      * above, but any direct arguments we might've handled earlier).  If so,
    3940             :      * we have nested aggregate functions, which is semantically nonsensical,
    3941             :      * so complain.  (This should have been caught by the parser, so we don't
    3942             :      * need to work hard on a helpful error message; but we defend against it
    3943             :      * here anyway, just to be sure.)
    3944             :      */
    3945       30140 :     if (numaggs != list_length(aggstate->aggs))
    3946           0 :         ereport(ERROR,
    3947             :                 (errcode(ERRCODE_GROUPING_ERROR),
    3948             :                  errmsg("aggregate function calls cannot be nested")));
    3949             : 
    3950             :     /*
    3951             :      * Build expressions doing all the transition work at once. We build a
    3952             :      * different one for each phase, as the number of transition function
    3953             :      * invocation can differ between phases. Note this'll work both for
    3954             :      * transition and combination functions (although there'll only be one
    3955             :      * phase in the latter case).
    3956             :      */
    3957       87708 :     for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
    3958             :     {
    3959       57568 :         AggStatePerPhase phase = &aggstate->phases[phaseidx];
    3960       57568 :         bool        dohash = false;
    3961       57568 :         bool        dosort = false;
    3962             : 
    3963             :         /* phase 0 doesn't necessarily exist */
    3964       57568 :         if (!phase->aggnode)
    3965       27184 :             continue;
    3966             : 
    3967       30384 :         if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
    3968             :         {
    3969             :             /*
    3970             :              * Phase one, and only phase one, in a mixed agg performs both
    3971             :              * sorting and aggregation.
    3972             :              */
    3973         100 :             dohash = true;
    3974         100 :             dosort = true;
    3975             :         }
    3976       30284 :         else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
    3977             :         {
    3978             :             /*
    3979             :              * No need to compute a transition function for an AGG_MIXED phase
    3980             :              * 0 - the contents of the hashtables will have been computed
    3981             :              * during phase 1.
    3982             :              */
    3983         100 :             continue;
    3984             :         }
    3985       30184 :         else if (phase->aggstrategy == AGG_PLAIN ||
    3986        4200 :                  phase->aggstrategy == AGG_SORTED)
    3987             :         {
    3988       27328 :             dohash = false;
    3989       27328 :             dosort = true;
    3990             :         }
    3991        2856 :         else if (phase->aggstrategy == AGG_HASHED)
    3992             :         {
    3993        2856 :             dohash = true;
    3994        2856 :             dosort = false;
    3995             :         }
    3996             :         else
    3997             :             Assert(false);
    3998             : 
    3999       30284 :         phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
    4000             :                                              false);
    4001             : 
    4002             :         /* cache compiled expression for outer slot without NULL check */
    4003       30284 :         phase->evaltrans_cache[0][0] = phase->evaltrans;
    4004             :     }
    4005             : 
    4006       30140 :     return aggstate;
    4007             : }
    4008             : 
    4009             : /*
    4010             :  * Build the state needed to calculate a state value for an aggregate.
    4011             :  *
    4012             :  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
    4013             :  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
    4014             :  * of the arguments could be calculated from 'aggref', but the caller has
    4015             :  * calculated them already, so might as well pass them.
    4016             :  */
    4017             : static void
    4018       32546 : build_pertrans_for_aggref(AggStatePerTrans pertrans,
    4019             :                           AggState *aggstate, EState *estate,
    4020             :                           Aggref *aggref,
    4021             :                           Oid aggtransfn, Oid aggtranstype,
    4022             :                           Oid aggserialfn, Oid aggdeserialfn,
    4023             :                           Datum initValue, bool initValueIsNull,
    4024             :                           Oid *inputTypes, int numArguments)
    4025             : {
    4026       32546 :     int         numGroupingSets = Max(aggstate->maxsets, 1);
    4027       32546 :     Expr       *serialfnexpr = NULL;
    4028       32546 :     Expr       *deserialfnexpr = NULL;
    4029             :     ListCell   *lc;
    4030             :     int         numInputs;
    4031             :     int         numDirectArgs;
    4032             :     List       *sortlist;
    4033             :     int         numSortCols;
    4034             :     int         numDistinctCols;
    4035             :     int         i;
    4036             : 
    4037             :     /* Begin filling in the pertrans data */
    4038       32546 :     pertrans->aggref = aggref;
    4039       32546 :     pertrans->aggshared = false;
    4040       32546 :     pertrans->aggCollation = aggref->inputcollid;
    4041       32546 :     pertrans->transfn_oid = aggtransfn;
    4042       32546 :     pertrans->serialfn_oid = aggserialfn;
    4043       32546 :     pertrans->deserialfn_oid = aggdeserialfn;
    4044       32546 :     pertrans->initValue = initValue;
    4045       32546 :     pertrans->initValueIsNull = initValueIsNull;
    4046             : 
    4047             :     /* Count the "direct" arguments, if any */
    4048       32546 :     numDirectArgs = list_length(aggref->aggdirectargs);
    4049             : 
    4050             :     /* Count the number of aggregated input columns */
    4051       32546 :     pertrans->numInputs = numInputs = list_length(aggref->args);
    4052             : 
    4053       32546 :     pertrans->aggtranstype = aggtranstype;
    4054             : 
    4055             :     /*
    4056             :      * When combining states, we have no use at all for the aggregate
    4057             :      * function's transfn. Instead we use the combinefn.  In this case, the
    4058             :      * transfn and transfn_oid fields of pertrans refer to the combine
    4059             :      * function rather than the transition function.
    4060             :      */
    4061       32546 :     if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
    4062             :     {
    4063             :         Expr       *combinefnexpr;
    4064             :         size_t      numTransArgs;
    4065             : 
    4066             :         /*
    4067             :          * When combining there's only one input, the to-be-combined added
    4068             :          * transition value from below (this node's transition value is
    4069             :          * counted separately).
    4070             :          */
    4071         780 :         pertrans->numTransInputs = 1;
    4072             : 
    4073             :         /* account for the current transition state */
    4074         780 :         numTransArgs = pertrans->numTransInputs + 1;
    4075             : 
    4076         780 :         build_aggregate_combinefn_expr(aggtranstype,
    4077             :                                        aggref->inputcollid,
    4078             :                                        aggtransfn,
    4079             :                                        &combinefnexpr);
    4080         780 :         fmgr_info(aggtransfn, &pertrans->transfn);
    4081         780 :         fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
    4082             : 
    4083         780 :         pertrans->transfn_fcinfo =
    4084         780 :             (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
    4085         780 :         InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
    4086             :                                  &pertrans->transfn,
    4087             :                                  numTransArgs,
    4088             :                                  pertrans->aggCollation,
    4089             :                                  (void *) aggstate, NULL);
    4090             : 
    4091             :         /*
    4092             :          * Ensure that a combine function to combine INTERNAL states is not
    4093             :          * strict. This should have been checked during CREATE AGGREGATE, but
    4094             :          * the strict property could have been changed since then.
    4095             :          */
    4096         780 :         if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
    4097           0 :             ereport(ERROR,
    4098             :                     (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
    4099             :                      errmsg("combine function with transition type %s must not be declared STRICT",
    4100             :                             format_type_be(aggtranstype))));
    4101             :     }
    4102             :     else
    4103             :     {
    4104             :         Expr       *transfnexpr;
    4105             :         size_t      numTransArgs;
    4106             : 
    4107             :         /* Detect how many arguments to pass to the transfn */
    4108       31766 :         if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
    4109         170 :             pertrans->numTransInputs = numInputs;
    4110             :         else
    4111       31596 :             pertrans->numTransInputs = numArguments;
    4112             : 
    4113             :         /* account for the current transition state */
    4114       31766 :         numTransArgs = pertrans->numTransInputs + 1;
    4115             : 
    4116             :         /*
    4117             :          * Set up infrastructure for calling the transfn.  Note that
    4118             :          * invtransfn is not needed here.
    4119             :          */
    4120       63532 :         build_aggregate_transfn_expr(inputTypes,
    4121             :                                      numArguments,
    4122             :                                      numDirectArgs,
    4123       31766 :                                      aggref->aggvariadic,
    4124             :                                      aggtranstype,
    4125             :                                      aggref->inputcollid,
    4126             :                                      aggtransfn,
    4127             :                                      InvalidOid,
    4128             :                                      &transfnexpr,
    4129             :                                      NULL);
    4130       31766 :         fmgr_info(aggtransfn, &pertrans->transfn);
    4131       31766 :         fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
    4132             : 
    4133       31766 :         pertrans->transfn_fcinfo =
    4134       31766 :             (FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
    4135       31766 :         InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
    4136             :                                  &pertrans->transfn,
    4137             :                                  numTransArgs,
    4138             :                                  pertrans->aggCollation,
    4139             :                                  (void *) aggstate, NULL);
    4140             : 
    4141             :         /*
    4142             :          * If the transfn is strict and the initval is NULL, make sure input
    4143             :          * type and transtype are the same (or at least binary-compatible), so
    4144             :          * that it's OK to use the first aggregated input value as the initial
    4145             :          * transValue.  This should have been checked at agg definition time,
    4146             :          * but we must check again in case the transfn's strictness property
    4147             :          * has been changed.
    4148             :          */
    4149       31766 :         if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
    4150             :         {
    4151        3494 :             if (numArguments <= numDirectArgs ||
    4152        3494 :                 !IsBinaryCoercible(inputTypes[numDirectArgs],
    4153             :                                    aggtranstype))
    4154           0 :                 ereport(ERROR,
    4155             :                         (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
    4156             :                          errmsg("aggregate %u needs to have compatible input type and transition type",
    4157             :                                 aggref->aggfnoid)));
    4158             :         }
    4159             :     }
    4160             : 
    4161             :     /* get info about the state value's datatype */
    4162       32546 :     get_typlenbyval(aggtranstype,
    4163             :                     &pertrans->transtypeLen,
    4164             :                     &pertrans->transtypeByVal);
    4165             : 
    4166       32546 :     if (OidIsValid(aggserialfn))
    4167             :     {
    4168          72 :         build_aggregate_serialfn_expr(aggserialfn,
    4169             :                                       &serialfnexpr);
    4170          72 :         fmgr_info(aggserialfn, &pertrans->serialfn);
    4171          72 :         fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
    4172             : 
    4173          72 :         pertrans->serialfn_fcinfo =
    4174          72 :             (FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
    4175          72 :         InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
    4176             :                                  &pertrans->serialfn,
    4177             :                                  1,
    4178             :                                  InvalidOid,
    4179             :                                  (void *) aggstate, NULL);
    4180             :     }
    4181             : 
    4182       32546 :     if (OidIsValid(aggdeserialfn))
    4183             :     {
    4184          24 :         build_aggregate_deserialfn_expr(aggdeserialfn,
    4185             :                                         &deserialfnexpr);
    4186          24 :         fmgr_info(aggdeserialfn, &pertrans->deserialfn);
    4187          24 :         fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
    4188             : 
    4189          24 :         pertrans->deserialfn_fcinfo =
    4190          24 :             (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
    4191          24 :         InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
    4192             :                                  &pertrans->deserialfn,
    4193             :                                  2,
    4194             :                                  InvalidOid,
    4195             :                                  (void *) aggstate, NULL);
    4196             : 
    4197             :     }
    4198             : 
    4199             :     /*
    4200             :      * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
    4201             :      * have a list of SortGroupClause nodes; fish out the data in them and
    4202             :      * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
    4203             :      * however; the agg's transfn and finalfn are responsible for that.
    4204             :      *
    4205             :      * Note that by construction, if there is a DISTINCT clause then the ORDER
    4206             :      * BY clause is a prefix of it (see transformDistinctClause).
    4207             :      */
    4208       32546 :     if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
    4209             :     {
    4210         170 :         sortlist = NIL;
    4211         170 :         numSortCols = numDistinctCols = 0;
    4212             :     }
    4213       32376 :     else if (aggref->aggdistinct)
    4214             :     {
    4215         290 :         sortlist = aggref->aggdistinct;
    4216         290 :         numSortCols = numDistinctCols = list_length(sortlist);
    4217             :         Assert(numSortCols >= list_length(aggref->aggorder));
    4218             :     }
    4219             :     else
    4220             :     {
    4221       32086 :         sortlist = aggref->aggorder;
    4222       32086 :         numSortCols = list_length(sortlist);
    4223       32086 :         numDistinctCols = 0;
    4224             :     }
    4225             : 
    4226       32546 :     pertrans->numSortCols = numSortCols;
    4227       32546 :     pertrans->numDistinctCols = numDistinctCols;
    4228             : 
    4229             :     /*
    4230             :      * If we have either sorting or filtering to do, create a tupledesc and
    4231             :      * slot corresponding to the aggregated inputs (including sort
    4232             :      * expressions) of the agg.
    4233             :      */
    4234       32546 :     if (numSortCols > 0 || aggref->aggfilter)
    4235             :     {
    4236        9904 :         pertrans->sortdesc = ExecTypeFromTL(aggref->args);
    4237        9904 :         pertrans->sortslot =
    4238        9904 :             ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
    4239             :                                    &TTSOpsMinimalTuple);
    4240             :     }
    4241             : 
    4242       32546 :     if (numSortCols > 0)
    4243             :     {
    4244             :         /*
    4245             :          * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
    4246             :          * (yet)
    4247             :          */
    4248             :         Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
    4249             : 
    4250             :         /* If we have only one input, we need its len/byval info. */
    4251        9452 :         if (numInputs == 1)
    4252             :         {
    4253        1792 :             get_typlenbyval(inputTypes[numDirectArgs],
    4254             :                             &pertrans->inputtypeLen,
    4255             :                             &pertrans->inputtypeByVal);
    4256             :         }
    4257        7660 :         else if (numDistinctCols > 0)
    4258             :         {
    4259             :             /* we will need an extra slot to store prior values */
    4260          56 :             pertrans->uniqslot =
    4261          56 :                 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
    4262             :                                        &TTSOpsMinimalTuple);
    4263             :         }
    4264             : 
    4265             :         /* Extract the sort information for use later */
    4266        9452 :         pertrans->sortColIdx =
    4267        9452 :             (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
    4268        9452 :         pertrans->sortOperators =
    4269        9452 :             (Oid *) palloc(numSortCols * sizeof(Oid));
    4270        9452 :         pertrans->sortCollations =
    4271        9452 :             (Oid *) palloc(numSortCols * sizeof(Oid));
    4272        9452 :         pertrans->sortNullsFirst =
    4273        9452 :             (bool *) palloc(numSortCols * sizeof(bool));
    4274             : 
    4275        9452 :         i = 0;
    4276       19028 :         foreach(lc, sortlist)
    4277             :         {
    4278        9576 :             SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
    4279        9576 :             TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
    4280             : 
    4281             :             /* the parser should have made sure of this */
    4282             :             Assert(OidIsValid(sortcl->sortop));
    4283             : 
    4284        9576 :             pertrans->sortColIdx[i] = tle->resno;
    4285        9576 :             pertrans->sortOperators[i] = sortcl->sortop;
    4286        9576 :             pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
    4287        9576 :             pertrans->sortNullsFirst[i] = sortcl->nulls_first;
    4288        9576 :             i++;
    4289             :         }
    4290             :         Assert(i == numSortCols);
    4291             :     }
    4292             : 
    4293       32546 :     if (aggref->aggdistinct)
    4294             :     {
    4295             :         Oid        *ops;
    4296             : 
    4297             :         Assert(numArguments > 0);
    4298             :         Assert(list_length(aggref->aggdistinct) == numDistinctCols);
    4299             : 
    4300         290 :         ops = palloc(numDistinctCols * sizeof(Oid));
    4301             : 
    4302         290 :         i = 0;
    4303         684 :         foreach(lc, aggref->aggdistinct)
    4304         394 :             ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
    4305             : 
    4306             :         /* lookup / build the necessary comparators */
    4307         290 :         if (numDistinctCols == 1)
    4308         234 :             fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
    4309             :         else
    4310          56 :             pertrans->equalfnMulti =
    4311         112 :                 execTuplesMatchPrepare(pertrans->sortdesc,
    4312             :                                        numDistinctCols,
    4313          56 :                                        pertrans->sortColIdx,
    4314             :                                        ops,
    4315          56 :                                        pertrans->sortCollations,
    4316             :                                        &aggstate->ss.ps);
    4317         290 :         pfree(ops);
    4318             :     }
    4319             : 
    4320       32546 :     pertrans->sortstates = (Tuplesortstate **)
    4321       32546 :         palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
    4322       32546 : }
    4323             : 
    4324             : 
    4325             : static Datum
    4326       10664 : GetAggInitVal(Datum textInitVal, Oid transtype)
    4327             : {
    4328             :     Oid         typinput,
    4329             :                 typioparam;
    4330             :     char       *strInitVal;
    4331             :     Datum       initVal;
    4332             : 
    4333       10664 :     getTypeInputInfo(transtype, &typinput, &typioparam);
    4334       10664 :     strInitVal = TextDatumGetCString(textInitVal);
    4335       10664 :     initVal = OidInputFunctionCall(typinput, strInitVal,
    4336             :                                    typioparam, -1);
    4337       10664 :     pfree(strInitVal);
    4338       10664 :     return initVal;
    4339             : }
    4340             : 
    4341             : /*
    4342             :  * find_compatible_peragg - search for a previously initialized per-Agg struct
    4343             :  *
    4344             :  * Searches the previously looked at aggregates to find one which is compatible
    4345             :  * with this one, with the same input parameters. If no compatible aggregate
    4346             :  * can be found, returns -1.
    4347             :  *
    4348             :  * As a side-effect, this also collects a list of existing, shareable per-Trans
    4349             :  * structs with matching inputs. If no identical Aggref is found, the list is
    4350             :  * passed later to find_compatible_pertrans, to see if we can at least reuse
    4351             :  * the state value of another aggregate.
    4352             :  */
    4353             : static int
    4354       32966 : find_compatible_peragg(Aggref *newagg, AggState *aggstate,
    4355             :                        int lastaggno, List **same_input_transnos)
    4356             : {
    4357             :     int         aggno;
    4358             :     AggStatePerAgg peraggs;
    4359             : 
    4360       32966 :     *same_input_transnos = NIL;
    4361             : 
    4362             :     /* we mustn't reuse the aggref if it contains volatile function calls */
    4363       32966 :     if (contain_volatile_functions((Node *) newagg))
    4364          32 :         return -1;
    4365             : 
    4366       32934 :     peraggs = aggstate->peragg;
    4367             : 
    4368             :     /*
    4369             :      * Search through the list of already seen aggregates. If we find an
    4370             :      * existing identical aggregate call, then we can re-use that one. While
    4371             :      * searching, we'll also collect a list of Aggrefs with the same input
    4372             :      * parameters. If no matching Aggref is found, the caller can potentially
    4373             :      * still re-use the transition state of one of them.  (At this stage we
    4374             :      * just compare the parsetrees; whether different aggregates share the
    4375             :      * same transition function will be checked later.)
    4376             :      */
    4377       39996 :     for (aggno = 0; aggno <= lastaggno; aggno++)
    4378             :     {
    4379             :         AggStatePerAgg peragg;
    4380             :         Aggref     *existingRef;
    4381             : 
    4382        7378 :         peragg = &peraggs[aggno];
    4383        7378 :         existingRef = peragg->aggref;
    4384             : 
    4385             :         /* all of the following must be the same or it's no match */
    4386        7378 :         if (newagg->inputcollid != existingRef->inputcollid ||
    4387        6794 :             newagg->aggtranstype != existingRef->aggtranstype ||
    4388        3194 :             newagg->aggstar != existingRef->aggstar ||
    4389        1892 :             newagg->aggvariadic != existingRef->aggvariadic ||
    4390        1892 :             newagg->aggkind != existingRef->aggkind ||
    4391        1764 :             !equal(newagg->args, existingRef->args) ||
    4392         690 :             !equal(newagg->aggorder, existingRef->aggorder) ||
    4393         686 :             !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
    4394         686 :             !equal(newagg->aggfilter, existingRef->aggfilter))
    4395        6716 :             continue;
    4396             : 
    4397             :         /* if it's the same aggregate function then report exact match */
    4398         662 :         if (newagg->aggfnoid == existingRef->aggfnoid &&
    4399         324 :             newagg->aggtype == existingRef->aggtype &&
    4400         648 :             newagg->aggcollid == existingRef->aggcollid &&
    4401         324 :             equal(newagg->aggdirectargs, existingRef->aggdirectargs))
    4402             :         {
    4403         316 :             list_free(*same_input_transnos);
    4404         316 :             *same_input_transnos = NIL;
    4405         316 :             return aggno;
    4406             :         }
    4407             : 
    4408             :         /*
    4409             :          * Not identical, but it had the same inputs.  If the final function
    4410             :          * permits sharing, return its transno to the caller, in case we can
    4411             :          * re-use its per-trans state.  (If there's already sharing going on,
    4412             :          * we might report a transno more than once.  find_compatible_pertrans
    4413             :          * is cheap enough that it's not worth spending cycles to avoid that.)
    4414             :          */
    4415         346 :         if (peragg->shareable)
    4416         342 :             *same_input_transnos = lappend_int(*same_input_transnos,
    4417             :                                                peragg->transno);
    4418             :     }
    4419             : 
    4420       32618 :     return -1;
    4421             : }
    4422             : 
    4423             : /*
    4424             :  * find_compatible_pertrans - search for a previously initialized per-Trans
    4425             :  * struct
    4426             :  *
    4427             :  * Searches the list of transnos for a per-Trans struct with the same
    4428             :  * transition function and initial condition. (The inputs have already been
    4429             :  * verified to match.)
    4430             :  */
    4431             : static int
    4432       32646 : find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
    4433             :                          Oid aggtransfn, Oid aggtranstype,
    4434             :                          Oid aggserialfn, Oid aggdeserialfn,
    4435             :                          Datum initValue, bool initValueIsNull,
    4436             :                          List *transnos)
    4437             : {
    4438             :     ListCell   *lc;
    4439             : 
    4440             :     /* If this aggregate can't share transition states, give up */
    4441       32646 :     if (!shareable)
    4442          76 :         return -1;
    4443             : 
    4444       32788 :     foreach(lc, transnos)
    4445             :     {
    4446         318 :         int         transno = lfirst_int(lc);
    4447         318 :         AggStatePerTrans pertrans = &aggstate->pertrans[transno];
    4448             : 
    4449             :         /*
    4450             :          * if the transfns or transition state types are not the same then the
    4451             :          * state can't be shared.
    4452             :          */
    4453         318 :         if (aggtransfn != pertrans->transfn_oid ||
    4454         104 :             aggtranstype != pertrans->aggtranstype)
    4455         214 :             continue;
    4456             : 
    4457             :         /*
    4458             :          * The serialization and deserialization functions must match, if
    4459             :          * present, as we're unable to share the trans state for aggregates
    4460             :          * which will serialize or deserialize into different formats.
    4461             :          * Remember that these will be InvalidOid if they're not required for
    4462             :          * this agg node.
    4463             :          */
    4464         104 :         if (aggserialfn != pertrans->serialfn_oid ||
    4465         104 :             aggdeserialfn != pertrans->deserialfn_oid)
    4466           0 :             continue;
    4467             : 
    4468             :         /*
    4469             :          * Check that the initial condition matches, too.
    4470             :          */
    4471         104 :         if (initValueIsNull && pertrans->initValueIsNull)
    4472         100 :             return transno;
    4473             : 
    4474         152 :         if (!initValueIsNull && !pertrans->initValueIsNull &&
    4475         152 :             datumIsEqual(initValue, pertrans->initValue,
    4476          76 :                          pertrans->transtypeByVal, pertrans->transtypeLen))
    4477          72 :             return transno;
    4478             :     }
    4479       32470 :     return -1;
    4480             : }
    4481             : 
    4482             : void
    4483       30092 : ExecEndAgg(AggState *node)
    4484             : {
    4485             :     PlanState  *outerPlan;
    4486             :     int         transno;
    4487       30092 :     int         numGroupingSets = Max(node->maxsets, 1);
    4488             :     int         setno;
    4489             : 
    4490             :     /* Make sure we have closed any open tuplesorts */
    4491             : 
    4492       30092 :     if (node->sort_in)
    4493          92 :         tuplesort_end(node->sort_in);
    4494       30092 :     if (node->sort_out)
    4495          24 :         tuplesort_end(node->sort_out);
    4496             : 
    4497       30092 :     hashagg_reset_spill_state(node);
    4498             : 
    4499       30092 :     if (node->hash_metacxt != NULL)
    4500             :     {
    4501        2950 :         MemoryContextDelete(node->hash_metacxt);
    4502        2950 :         node->hash_metacxt = NULL;
    4503             :     }
    4504             : 
    4505       62588 :     for (transno = 0; transno < node->numtrans; transno++)
    4506             :     {
    4507       32496 :         AggStatePerTrans pertrans = &node->pertrans[transno];
    4508             : 
    4509       65660 :         for (setno = 0; setno < numGroupingSets; setno++)
    4510             :         {
    4511       33164 :             if (pertrans->sortstates[setno])
    4512           0 :                 tuplesort_end(pertrans->sortstates[setno]);
    4513             :         }
    4514             :     }
    4515             : 
    4516             :     /* And ensure any agg shutdown callbacks have been called */
    4517       60656 :     for (setno = 0; setno < numGroupingSets; setno++)
    4518       30564 :         ReScanExprContext(node->aggcontexts[setno]);
    4519       30092 :     if (node->hashcontext)
    4520        2950 :         ReScanExprContext(node->hashcontext);
    4521             : 
    4522             :     /*
    4523             :      * We don't actually free any ExprContexts here (see comment in
    4524             :      * ExecFreeExprContext), just unlinking the output one from the plan node
    4525             :      * suffices.
    4526             :      */
    4527       30092 :     ExecFreeExprContext(&node->ss.ps);
    4528             : 
    4529             :     /* clean up tuple table */
    4530       30092 :     ExecClearTuple(node->ss.ss_ScanTupleSlot);
    4531             : 
    4532       30092 :     outerPlan = outerPlanState(node);
    4533       30092 :     ExecEndNode(outerPlan);
    4534       30092 : }
    4535             : 
    4536             : void
    4537     1183066 : ExecReScanAgg(AggState *node)
    4538             : {
    4539     1183066 :     ExprContext *econtext = node->ss.ps.ps_ExprContext;
    4540     1183066 :     PlanState  *outerPlan = outerPlanState(node);
    4541     1183066 :     Agg        *aggnode = (Agg *) node->ss.ps.plan;
    4542             :     int         transno;
    4543     1183066 :     int         numGroupingSets = Max(node->maxsets, 1);
    4544             :     int         setno;
    4545             : 
    4546     1183066 :     node->agg_done = false;
    4547             : 
    4548     1183066 :     if (node->aggstrategy == AGG_HASHED)
    4549             :     {
    4550             :         /*
    4551             :          * In the hashed case, if we haven't yet built the hash table then we
    4552             :          * can just return; nothing done yet, so nothing to undo. If subnode's
    4553             :          * chgParam is not NULL then it will be re-scanned by ExecProcNode,
    4554             :          * else no reason to re-scan it at all.
    4555             :          */
    4556       42484 :         if (!node->table_filled)
    4557         380 :             return;
    4558             : 
    4559             :         /*
    4560             :          * If we do have the hash table, and it never spilled, and the subplan
    4561             :          * does not have any parameter changes, and none of our own parameter
    4562             :          * changes affect input expressions of the aggregated functions, then
    4563             :          * we can just rescan the existing hash table; no need to build it
    4564             :          * again.
    4565             :          */
    4566       42104 :         if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
    4567         564 :             !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
    4568             :         {
    4569         548 :             ResetTupleHashIterator(node->perhash[0].hashtable,
    4570             :                                    &node->perhash[0].hashiter);
    4571         548 :             select_current_set(node, 0, true);
    4572         548 :             return;
    4573             :         }
    4574             :     }
    4575             : 
    4576             :     /* Make sure we have closed any open tuplesorts */
    4577     2322768 :     for (transno = 0; transno < node->numtrans; transno++)
    4578             :     {
    4579     2281284 :         for (setno = 0; setno < numGroupingSets; setno++)
    4580             :         {
    4581     1140654 :             AggStatePerTrans pertrans = &node->pertrans[transno];
    4582             : 
    4583     1140654 :             if (pertrans->sortstates[setno])
    4584             :             {
    4585           0 :                 tuplesort_end(pertrans->sortstates[setno]);
    4586           0 :                 pertrans->sortstates[setno] = NULL;
    4587             :             }
    4588             :         }
    4589             :     }
    4590             : 
    4591             :     /*
    4592             :      * We don't need to ReScanExprContext the output tuple context here;
    4593             :      * ExecReScan already did it. But we do need to reset our per-grouping-set
    4594             :      * contexts, which may have transvalues stored in them. (We use rescan
    4595             :      * rather than just reset because transfns may have registered callbacks
    4596             :      * that need to be run now.) For the AGG_HASHED case, see below.
    4597             :      */
    4598             : 
    4599     2364300 :     for (setno = 0; setno < numGroupingSets; setno++)
    4600             :     {
    4601     1182162 :         ReScanExprContext(node->aggcontexts[setno]);
    4602             :     }
    4603             : 
    4604             :     /* Release first tuple of group, if we have made a copy */
    4605     1182138 :     if (node->grp_firstTuple != NULL)
    4606             :     {
    4607           0 :         heap_freetuple(node->grp_firstTuple);
    4608           0 :         node->grp_firstTuple = NULL;
    4609             :     }
    4610     1182138 :     ExecClearTuple(node->ss.ss_ScanTupleSlot);
    4611             : 
    4612             :     /* Forget current agg values */
    4613     2322768 :     MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
    4614     1182138 :     MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
    4615             : 
    4616             :     /*
    4617             :      * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
    4618             :      * the hashcontext. This used to be an issue, but now, resetting a context
    4619             :      * automatically deletes sub-contexts too.
    4620             :      */
    4621     1182138 :     if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
    4622             :     {
    4623       41568 :         hashagg_reset_spill_state(node);
    4624             : 
    4625       41568 :         node->hash_ever_spilled = false;
    4626       41568 :         node->hash_spill_mode = false;
    4627       41568 :         node->hash_ngroups_current = 0;
    4628             : 
    4629       41568 :         ReScanExprContext(node->hashcontext);
    4630             :         /* Rebuild an empty hash table */
    4631       41568 :         build_hash_tables(node);
    4632       41568 :         node->table_filled = false;
    4633             :         /* iterator will be reset when the table is filled */
    4634             : 
    4635       41568 :         hashagg_recompile_expressions(node, false, false);
    4636             :     }
    4637             : 
    4638     1182138 :     if (node->aggstrategy != AGG_HASHED)
    4639             :     {
    4640             :         /*
    4641             :          * Reset the per-group state (in particular, mark transvalues null)
    4642             :          */
    4643     2281188 :         for (setno = 0; setno < numGroupingSets; setno++)
    4644             :         {
    4645     3421850 :             MemSet(node->pergroups[setno], 0,
    4646             :                    sizeof(AggStatePerGroupData) * node->numaggs);
    4647             :         }
    4648             : 
    4649             :         /* reset to phase 1 */
    4650     1140582 :         initialize_phase(node, 1);
    4651             : 
    4652     1140582 :         node->input_done = false;
    4653     1140582 :         node->projected_set = -1;
    4654             :     }
    4655             : 
    4656     1182138 :     if (outerPlan->chgParam == NULL)
    4657          96 :         ExecReScan(outerPlan);
    4658             : }
    4659             : 
    4660             : 
    4661             : /***********************************************************************
    4662             :  * API exposed to aggregate functions
    4663             :  ***********************************************************************/
    4664             : 
    4665             : 
    4666             : /*
    4667             :  * AggCheckCallContext - test if a SQL function is being called as an aggregate
    4668             :  *
    4669             :  * The transition and/or final functions of an aggregate may want to verify
    4670             :  * that they are being called as aggregates, rather than as plain SQL
    4671             :  * functions.  They should use this function to do so.  The return value
    4672             :  * is nonzero if being called as an aggregate, or zero if not.  (Specific
    4673             :  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
    4674             :  * values could conceivably appear in future.)
    4675             :  *
    4676             :  * If aggcontext isn't NULL, the function also stores at *aggcontext the
    4677             :  * identity of the memory context that aggregate transition values are being
    4678             :  * stored in.  Note that the same aggregate call site (flinfo) may be called
    4679             :  * interleaved on different transition values in different contexts, so it's
    4680             :  * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
    4681             :  * cache it in the transvalue itself (for internal-type transvalues).
    4682             :  */
    4683             : int
    4684     2840764 : AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
    4685             : {
    4686     2840764 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4687             :     {
    4688     2791884 :         if (aggcontext)
    4689             :         {
    4690     1077876 :             AggState   *aggstate = ((AggState *) fcinfo->context);
    4691     1077876 :             ExprContext *cxt = aggstate->curaggcontext;
    4692             : 
    4693     1077876 :             *aggcontext = cxt->ecxt_per_tuple_memory;
    4694             :         }
    4695     2791884 :         return AGG_CONTEXT_AGGREGATE;
    4696             :     }
    4697       48880 :     if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
    4698             :     {
    4699       47776 :         if (aggcontext)
    4700         440 :             *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
    4701       47776 :         return AGG_CONTEXT_WINDOW;
    4702             :     }
    4703             : 
    4704             :     /* this is just to prevent "uninitialized variable" warnings */
    4705        1104 :     if (aggcontext)
    4706        1072 :         *aggcontext = NULL;
    4707        1104 :     return 0;
    4708             : }
    4709             : 
    4710             : /*
    4711             :  * AggGetAggref - allow an aggregate support function to get its Aggref
    4712             :  *
    4713             :  * If the function is being called as an aggregate support function,
    4714             :  * return the Aggref node for the aggregate call.  Otherwise, return NULL.
    4715             :  *
    4716             :  * Aggregates sharing the same inputs and transition functions can get
    4717             :  * merged into a single transition calculation.  If the transition function
    4718             :  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
    4719             :  * executing.  It must therefore not pay attention to the Aggref fields that
    4720             :  * relate to the final function, as those are indeterminate.  But if a final
    4721             :  * function calls AggGetAggref, it will get a precise result.
    4722             :  *
    4723             :  * Note that if an aggregate is being used as a window function, this will
    4724             :  * return NULL.  We could provide a similar function to return the relevant
    4725             :  * WindowFunc node in such cases, but it's not needed yet.
    4726             :  */
    4727             : Aggref *
    4728         166 : AggGetAggref(FunctionCallInfo fcinfo)
    4729             : {
    4730         166 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4731             :     {
    4732         166 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4733             :         AggStatePerAgg curperagg;
    4734             :         AggStatePerTrans curpertrans;
    4735             : 
    4736             :         /* check curperagg (valid when in a final function) */
    4737         166 :         curperagg = aggstate->curperagg;
    4738             : 
    4739         166 :         if (curperagg)
    4740           0 :             return curperagg->aggref;
    4741             : 
    4742             :         /* check curpertrans (valid when in a transition function) */
    4743         166 :         curpertrans = aggstate->curpertrans;
    4744             : 
    4745         166 :         if (curpertrans)
    4746         166 :             return curpertrans->aggref;
    4747             :     }
    4748           0 :     return NULL;
    4749             : }
    4750             : 
    4751             : /*
    4752             :  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
    4753             :  *
    4754             :  * This is useful in agg final functions; the context returned is one that
    4755             :  * the final function can safely reset as desired.  This isn't useful for
    4756             :  * transition functions, since the context returned MAY (we don't promise)
    4757             :  * be the same as the context those are called in.
    4758             :  *
    4759             :  * As above, this is currently not useful for aggs called as window functions.
    4760             :  */
    4761             : MemoryContext
    4762           0 : AggGetTempMemoryContext(FunctionCallInfo fcinfo)
    4763             : {
    4764           0 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4765             :     {
    4766           0 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4767             : 
    4768           0 :         return aggstate->tmpcontext->ecxt_per_tuple_memory;
    4769             :     }
    4770           0 :     return NULL;
    4771             : }
    4772             : 
    4773             : /*
    4774             :  * AggStateIsShared - find out whether transition state is shared
    4775             :  *
    4776             :  * If the function is being called as an aggregate support function,
    4777             :  * return true if the aggregate's transition state is shared across
    4778             :  * multiple aggregates, false if it is not.
    4779             :  *
    4780             :  * Returns true if not called as an aggregate support function.
    4781             :  * This is intended as a conservative answer, ie "no you'd better not
    4782             :  * scribble on your input".  In particular, will return true if the
    4783             :  * aggregate is being used as a window function, which is a scenario
    4784             :  * in which changing the transition state is a bad idea.  We might
    4785             :  * want to refine the behavior for the window case in future.
    4786             :  */
    4787             : bool
    4788         166 : AggStateIsShared(FunctionCallInfo fcinfo)
    4789             : {
    4790         166 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4791             :     {
    4792         166 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4793             :         AggStatePerAgg curperagg;
    4794             :         AggStatePerTrans curpertrans;
    4795             : 
    4796             :         /* check curperagg (valid when in a final function) */
    4797         166 :         curperagg = aggstate->curperagg;
    4798             : 
    4799         166 :         if (curperagg)
    4800           0 :             return aggstate->pertrans[curperagg->transno].aggshared;
    4801             : 
    4802             :         /* check curpertrans (valid when in a transition function) */
    4803         166 :         curpertrans = aggstate->curpertrans;
    4804             : 
    4805         166 :         if (curpertrans)
    4806         166 :             return curpertrans->aggshared;
    4807             :     }
    4808           0 :     return true;
    4809             : }
    4810             : 
    4811             : /*
    4812             :  * AggRegisterCallback - register a cleanup callback for an aggregate
    4813             :  *
    4814             :  * This is useful for aggs to register shutdown callbacks, which will ensure
    4815             :  * that non-memory resources are freed.  The callback will occur just before
    4816             :  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
    4817             :  * either between groups or as a result of rescanning the query.  The callback
    4818             :  * will NOT be called on error paths.  The typical use-case is for freeing of
    4819             :  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
    4820             :  * created by the agg functions.  (The callback will not be called until after
    4821             :  * the result of the finalfn is no longer needed, so it's safe for the finalfn
    4822             :  * to return data that will be freed by the callback.)
    4823             :  *
    4824             :  * As above, this is currently not useful for aggs called as window functions.
    4825             :  */
    4826             : void
    4827         454 : AggRegisterCallback(FunctionCallInfo fcinfo,
    4828             :                     ExprContextCallbackFunction func,
    4829             :                     Datum arg)
    4830             : {
    4831         454 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4832             :     {
    4833         454 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4834         454 :         ExprContext *cxt = aggstate->curaggcontext;
    4835             : 
    4836         454 :         RegisterExprContextCallback(cxt, func, arg);
    4837             : 
    4838         454 :         return;
    4839             :     }
    4840           0 :     elog(ERROR, "aggregate function cannot register a callback in this context");
    4841             : }
    4842             : 
    4843             : 
    4844             : /*
    4845             :  * aggregate_dummy - dummy execution routine for aggregate functions
    4846             :  *
    4847             :  * This function is listed as the implementation (prosrc field) of pg_proc
    4848             :  * entries for aggregate functions.  Its only purpose is to throw an error
    4849             :  * if someone mistakenly executes such a function in the normal way.
    4850             :  *
    4851             :  * Perhaps someday we could assign real meaning to the prosrc field of
    4852             :  * an aggregate?
    4853             :  */
    4854             : Datum
    4855           0 : aggregate_dummy(PG_FUNCTION_ARGS)
    4856             : {
    4857           0 :     elog(ERROR, "aggregate function %u called as normal function",
    4858             :          fcinfo->flinfo->fn_oid);
    4859             :     return (Datum) 0;           /* keep compiler quiet */
    4860             : }

Generated by: LCOV version 1.13