LCOV - code coverage report
Current view: top level - src/backend/executor - nodeAgg.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 1426 1506 94.7 %
Date: 2025-04-01 14:15:22 Functions: 57 58 98.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * nodeAgg.c
       4             :  *    Routines to handle aggregate nodes.
       5             :  *
       6             :  *    ExecAgg normally evaluates each aggregate in the following steps:
       7             :  *
       8             :  *       transvalue = initcond
       9             :  *       foreach input_tuple do
      10             :  *          transvalue = transfunc(transvalue, input_value(s))
      11             :  *       result = finalfunc(transvalue, direct_argument(s))
      12             :  *
      13             :  *    If a finalfunc is not supplied then the result is just the ending
      14             :  *    value of transvalue.
      15             :  *
      16             :  *    Other behaviors can be selected by the "aggsplit" mode, which exists
      17             :  *    to support partial aggregation.  It is possible to:
      18             :  *    * Skip running the finalfunc, so that the output is always the
      19             :  *    final transvalue state.
      20             :  *    * Substitute the combinefunc for the transfunc, so that transvalue
      21             :  *    states (propagated up from a child partial-aggregation step) are merged
      22             :  *    rather than processing raw input rows.  (The statements below about
      23             :  *    the transfunc apply equally to the combinefunc, when it's selected.)
      24             :  *    * Apply the serializefunc to the output values (this only makes sense
      25             :  *    when skipping the finalfunc, since the serializefunc works on the
      26             :  *    transvalue data type).
      27             :  *    * Apply the deserializefunc to the input values (this only makes sense
      28             :  *    when using the combinefunc, for similar reasons).
      29             :  *    It is the planner's responsibility to connect up Agg nodes using these
      30             :  *    alternate behaviors in a way that makes sense, with partial aggregation
      31             :  *    results being fed to nodes that expect them.
      32             :  *
      33             :  *    If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
      34             :  *    input tuples and eliminate duplicates (if required) before performing
      35             :  *    the above-depicted process.  (However, we don't do that for ordered-set
      36             :  *    aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
      37             :  *    so far as this module is concerned.)  Note that partial aggregation
      38             :  *    is not supported in these cases, since we couldn't ensure global
      39             :  *    ordering or distinctness of the inputs.
      40             :  *
      41             :  *    If transfunc is marked "strict" in pg_proc and initcond is NULL,
      42             :  *    then the first non-NULL input_value is assigned directly to transvalue,
      43             :  *    and transfunc isn't applied until the second non-NULL input_value.
      44             :  *    The agg's first input type and transtype must be the same in this case!
      45             :  *
      46             :  *    If transfunc is marked "strict" then NULL input_values are skipped,
      47             :  *    keeping the previous transvalue.  If transfunc is not strict then it
      48             :  *    is called for every input tuple and must deal with NULL initcond
      49             :  *    or NULL input_values for itself.
      50             :  *
      51             :  *    If finalfunc is marked "strict" then it is not called when the
      52             :  *    ending transvalue is NULL, instead a NULL result is created
      53             :  *    automatically (this is just the usual handling of strict functions,
      54             :  *    of course).  A non-strict finalfunc can make its own choice of
      55             :  *    what to return for a NULL ending transvalue.
      56             :  *
      57             :  *    Ordered-set aggregates are treated specially in one other way: we
      58             :  *    evaluate any "direct" arguments and pass them to the finalfunc along
      59             :  *    with the transition value.
      60             :  *
      61             :  *    A finalfunc can have additional arguments beyond the transvalue and
      62             :  *    any "direct" arguments, corresponding to the input arguments of the
      63             :  *    aggregate.  These are always just passed as NULL.  Such arguments may be
      64             :  *    needed to allow resolution of a polymorphic aggregate's result type.
      65             :  *
      66             :  *    We compute aggregate input expressions and run the transition functions
      67             :  *    in a temporary econtext (aggstate->tmpcontext).  This is reset at least
      68             :  *    once per input tuple, so when the transvalue datatype is
      69             :  *    pass-by-reference, we have to be careful to copy it into a longer-lived
      70             :  *    memory context, and free the prior value to avoid memory leakage.  We
      71             :  *    store transvalues in another set of econtexts, aggstate->aggcontexts
      72             :  *    (one per grouping set, see below), which are also used for the hashtable
      73             :  *    structures in AGG_HASHED mode.  These econtexts are rescanned, not just
      74             :  *    reset, at group boundaries so that aggregate transition functions can
      75             :  *    register shutdown callbacks via AggRegisterCallback.
      76             :  *
      77             :  *    The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
      78             :  *    run finalize functions and compute the output tuple; this context can be
      79             :  *    reset once per output tuple.
      80             :  *
      81             :  *    The executor's AggState node is passed as the fmgr "context" value in
      82             :  *    all transfunc and finalfunc calls.  It is not recommended that the
      83             :  *    transition functions look at the AggState node directly, but they can
      84             :  *    use AggCheckCallContext() to verify that they are being called by
      85             :  *    nodeAgg.c (and not as ordinary SQL functions).  The main reason a
      86             :  *    transition function might want to know this is so that it can avoid
      87             :  *    palloc'ing a fixed-size pass-by-ref transition value on every call:
      88             :  *    it can instead just scribble on and return its left input.  Ordinarily
      89             :  *    it is completely forbidden for functions to modify pass-by-ref inputs,
      90             :  *    but in the aggregate case we know the left input is either the initial
      91             :  *    transition value or a previous function result, and in either case its
      92             :  *    value need not be preserved.  See int8inc() for an example.  Notice that
      93             :  *    the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
      94             :  *    the previous transition value pointer is returned.  It is also possible
      95             :  *    to avoid repeated data copying when the transition value is an expanded
      96             :  *    object: to do that, the transition function must take care to return
      97             :  *    an expanded object that is in a child context of the memory context
      98             :  *    returned by AggCheckCallContext().  Also, some transition functions want
      99             :  *    to store working state in addition to the nominal transition value; they
     100             :  *    can use the memory context returned by AggCheckCallContext() to do that.
     101             :  *
     102             :  *    Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
     103             :  *    AggState is available as context in earlier releases (back to 8.1),
     104             :  *    but direct examination of the node is needed to use it before 9.0.
     105             :  *
     106             :  *    As of 9.4, aggregate transition functions can also use AggGetAggref()
     107             :  *    to get hold of the Aggref expression node for their aggregate call.
     108             :  *    This is mainly intended for ordered-set aggregates, which are not
     109             :  *    supported as window functions.  (A regular aggregate function would
     110             :  *    need some fallback logic to use this, since there's no Aggref node
     111             :  *    for a window function.)
     112             :  *
     113             :  *    Grouping sets:
     114             :  *
     115             :  *    A list of grouping sets which is structurally equivalent to a ROLLUP
     116             :  *    clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
     117             :  *    ordered data.  We do this by keeping a separate set of transition values
     118             :  *    for each grouping set being concurrently processed; for each input tuple
     119             :  *    we update them all, and on group boundaries we reset those states
     120             :  *    (starting at the front of the list) whose grouping values have changed
     121             :  *    (the list of grouping sets is ordered from most specific to least
     122             :  *    specific).
     123             :  *
     124             :  *    Where more complex grouping sets are used, we break them down into
     125             :  *    "phases", where each phase has a different sort order (except phase 0
     126             :  *    which is reserved for hashing).  During each phase but the last, the
     127             :  *    input tuples are additionally stored in a tuplesort which is keyed to the
     128             :  *    next phase's sort order; during each phase but the first, the input
     129             :  *    tuples are drawn from the previously sorted data.  (The sorting of the
     130             :  *    data for the first phase is handled by the planner, as it might be
     131             :  *    satisfied by underlying nodes.)
     132             :  *
     133             :  *    Hashing can be mixed with sorted grouping.  To do this, we have an
     134             :  *    AGG_MIXED strategy that populates the hashtables during the first sorted
     135             :  *    phase, and switches to reading them out after completing all sort phases.
     136             :  *    We can also support AGG_HASHED with multiple hash tables and no sorting
     137             :  *    at all.
     138             :  *
     139             :  *    From the perspective of aggregate transition and final functions, the
     140             :  *    only issue regarding grouping sets is this: a single call site (flinfo)
     141             :  *    of an aggregate function may be used for updating several different
     142             :  *    transition values in turn. So the function must not cache in the flinfo
     143             :  *    anything which logically belongs as part of the transition value (most
     144             :  *    importantly, the memory context in which the transition value exists).
     145             :  *    The support API functions (AggCheckCallContext, AggRegisterCallback) are
     146             :  *    sensitive to the grouping set for which the aggregate function is
     147             :  *    currently being called.
     148             :  *
     149             :  *    Plan structure:
     150             :  *
     151             :  *    What we get from the planner is actually one "real" Agg node which is
     152             :  *    part of the plan tree proper, but which optionally has an additional list
     153             :  *    of Agg nodes hung off the side via the "chain" field.  This is because an
     154             :  *    Agg node happens to be a convenient representation of all the data we
     155             :  *    need for grouping sets.
     156             :  *
     157             :  *    For many purposes, we treat the "real" node as if it were just the first
     158             :  *    node in the chain.  The chain must be ordered such that hashed entries
     159             :  *    come before sorted/plain entries; the real node is marked AGG_MIXED if
     160             :  *    there are both types present (in which case the real node describes one
     161             :  *    of the hashed groupings, other AGG_HASHED nodes may optionally follow in
     162             :  *    the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
     163             :  *    the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
     164             :  *    nodes must be of the same type; if it is AGG_PLAIN, there can be no
     165             :  *    chained nodes.
     166             :  *
     167             :  *    We collect all hashed nodes into a single "phase", numbered 0, and create
     168             :  *    a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
     169             :  *    Phase 0 is allocated even if there are no hashes, but remains unused in
     170             :  *    that case.
     171             :  *
     172             :  *    AGG_HASHED nodes actually refer to only a single grouping set each,
     173             :  *    because for each hashed grouping we need a separate grpColIdx and
     174             :  *    numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
     175             :  *    grouping sets that share a sort order.  Each AGG_SORTED node other than
     176             :  *    the first one has an associated Sort node which describes the sort order
     177             :  *    to be used; the first sorted node takes its input from the outer subtree,
     178             :  *    which the planner has already arranged to provide ordered data.
     179             :  *
     180             :  *    Memory and ExprContext usage:
     181             :  *
     182             :  *    Because we're accumulating aggregate values across input rows, we need to
     183             :  *    use more memory contexts than just simple input/output tuple contexts.
     184             :  *    In fact, for a rollup, we need a separate context for each grouping set
     185             :  *    so that we can reset the inner (finer-grained) aggregates on their group
     186             :  *    boundaries while continuing to accumulate values for outer
     187             :  *    (coarser-grained) groupings.  On top of this, we might be simultaneously
     188             :  *    populating hashtables; however, we only need one context for all the
     189             :  *    hashtables.
     190             :  *
     191             :  *    So we create an array, aggcontexts, with an ExprContext for each grouping
     192             :  *    set in the largest rollup that we're going to process, and use the
     193             :  *    per-tuple memory context of those ExprContexts to store the aggregate
     194             :  *    transition values.  hashcontext is the single context created to support
     195             :  *    all hash tables.
     196             :  *
     197             :  *    Spilling To Disk
     198             :  *
     199             :  *    When performing hash aggregation, if the hash table memory exceeds the
     200             :  *    limit (see hash_agg_check_limits()), we enter "spill mode". In spill
     201             :  *    mode, we advance the transition states only for groups already in the
     202             :  *    hash table. For tuples that would need to create a new hash table
     203             :  *    entries (and initialize new transition states), we instead spill them to
     204             :  *    disk to be processed later. The tuples are spilled in a partitioned
     205             :  *    manner, so that subsequent batches are smaller and less likely to exceed
     206             :  *    hash_mem (if a batch does exceed hash_mem, it must be spilled
     207             :  *    recursively).
     208             :  *
     209             :  *    Spilled data is written to logical tapes. These provide better control
     210             :  *    over memory usage, disk space, and the number of files than if we were
     211             :  *    to use a BufFile for each spill.  We don't know the number of tapes needed
     212             :  *    at the start of the algorithm (because it can recurse), so a tape set is
     213             :  *    allocated at the beginning, and individual tapes are created as needed.
     214             :  *    As a particular tape is read, logtape.c recycles its disk space. When a
     215             :  *    tape is read to completion, it is destroyed entirely.
     216             :  *
     217             :  *    Tapes' buffers can take up substantial memory when many tapes are open at
     218             :  *    once. We only need one tape open at a time in read mode (using a buffer
     219             :  *    that's a multiple of BLCKSZ); but we need one tape open in write mode (each
     220             :  *    requiring a buffer of size BLCKSZ) for each partition.
     221             :  *
     222             :  *    Note that it's possible for transition states to start small but then
     223             :  *    grow very large; for instance in the case of ARRAY_AGG. In such cases,
     224             :  *    it's still possible to significantly exceed hash_mem. We try to avoid
     225             :  *    this situation by estimating what will fit in the available memory, and
     226             :  *    imposing a limit on the number of groups separately from the amount of
     227             :  *    memory consumed.
     228             :  *
     229             :  *    Transition / Combine function invocation:
     230             :  *
     231             :  *    For performance reasons transition functions, including combine
     232             :  *    functions, aren't invoked one-by-one from nodeAgg.c after computing
     233             :  *    arguments using the expression evaluation engine. Instead
     234             :  *    ExecBuildAggTrans() builds one large expression that does both argument
     235             :  *    evaluation and transition function invocation. That avoids performance
     236             :  *    issues due to repeated uses of expression evaluation, complications due
     237             :  *    to filter expressions having to be evaluated early, and allows to JIT
     238             :  *    the entire expression into one native function.
     239             :  *
     240             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
     241             :  * Portions Copyright (c) 1994, Regents of the University of California
     242             :  *
     243             :  * IDENTIFICATION
     244             :  *    src/backend/executor/nodeAgg.c
     245             :  *
     246             :  *-------------------------------------------------------------------------
     247             :  */
     248             : 
     249             : #include "postgres.h"
     250             : 
     251             : #include "access/htup_details.h"
     252             : #include "access/parallel.h"
     253             : #include "catalog/objectaccess.h"
     254             : #include "catalog/pg_aggregate.h"
     255             : #include "catalog/pg_proc.h"
     256             : #include "catalog/pg_type.h"
     257             : #include "common/hashfn.h"
     258             : #include "executor/execExpr.h"
     259             : #include "executor/executor.h"
     260             : #include "executor/nodeAgg.h"
     261             : #include "lib/hyperloglog.h"
     262             : #include "miscadmin.h"
     263             : #include "nodes/nodeFuncs.h"
     264             : #include "optimizer/optimizer.h"
     265             : #include "parser/parse_agg.h"
     266             : #include "parser/parse_coerce.h"
     267             : #include "utils/acl.h"
     268             : #include "utils/builtins.h"
     269             : #include "utils/datum.h"
     270             : #include "utils/dynahash.h"
     271             : #include "utils/expandeddatum.h"
     272             : #include "utils/injection_point.h"
     273             : #include "utils/logtape.h"
     274             : #include "utils/lsyscache.h"
     275             : #include "utils/memutils.h"
     276             : #include "utils/memutils_memorychunk.h"
     277             : #include "utils/syscache.h"
     278             : #include "utils/tuplesort.h"
     279             : 
     280             : /*
     281             :  * Control how many partitions are created when spilling HashAgg to
     282             :  * disk.
     283             :  *
     284             :  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
     285             :  * partitions needed such that each partition will fit in memory. The factor
     286             :  * is set higher than one because there's not a high cost to having a few too
     287             :  * many partitions, and it makes it less likely that a partition will need to
     288             :  * be spilled recursively. Another benefit of having more, smaller partitions
     289             :  * is that small hash tables may perform better than large ones due to memory
     290             :  * caching effects.
     291             :  *
     292             :  * We also specify a min and max number of partitions per spill. Too few might
     293             :  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
     294             :  * many will result in lots of memory wasted buffering the spill files (which
     295             :  * could instead be spent on a larger hash table).
     296             :  */
     297             : #define HASHAGG_PARTITION_FACTOR 1.50
     298             : #define HASHAGG_MIN_PARTITIONS 4
     299             : #define HASHAGG_MAX_PARTITIONS 1024
     300             : 
     301             : /*
     302             :  * For reading from tapes, the buffer size must be a multiple of
     303             :  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
     304             :  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
     305             :  * tape always uses a buffer of size BLCKSZ.
     306             :  */
     307             : #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
     308             : #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
     309             : 
     310             : /*
     311             :  * HyperLogLog is used for estimating the cardinality of the spilled tuples in
     312             :  * a given partition. 5 bits corresponds to a size of about 32 bytes and a
     313             :  * worst-case error of around 18%. That's effective enough to choose a
     314             :  * reasonable number of partitions when recursing.
     315             :  */
     316             : #define HASHAGG_HLL_BIT_WIDTH 5
     317             : 
     318             : /*
     319             :  * Assume the palloc overhead always uses sizeof(MemoryChunk) bytes.
     320             :  */
     321             : #define CHUNKHDRSZ sizeof(MemoryChunk)
     322             : 
     323             : /*
     324             :  * Represents partitioned spill data for a single hashtable. Contains the
     325             :  * necessary information to route tuples to the correct partition, and to
     326             :  * transform the spilled data into new batches.
     327             :  *
     328             :  * The high bits are used for partition selection (when recursing, we ignore
     329             :  * the bits that have already been used for partition selection at an earlier
     330             :  * level).
     331             :  */
     332             : typedef struct HashAggSpill
     333             : {
     334             :     int         npartitions;    /* number of partitions */
     335             :     LogicalTape **partitions;   /* spill partition tapes */
     336             :     int64      *ntuples;        /* number of tuples in each partition */
     337             :     uint32      mask;           /* mask to find partition from hash value */
     338             :     int         shift;          /* after masking, shift by this amount */
     339             :     hyperLogLogState *hll_card; /* cardinality estimate for contents */
     340             : } HashAggSpill;
     341             : 
     342             : /*
     343             :  * Represents work to be done for one pass of hash aggregation (with only one
     344             :  * grouping set).
     345             :  *
     346             :  * Also tracks the bits of the hash already used for partition selection by
     347             :  * earlier iterations, so that this batch can use new bits. If all bits have
     348             :  * already been used, no partitioning will be done (any spilled data will go
     349             :  * to a single output tape).
     350             :  */
     351             : typedef struct HashAggBatch
     352             : {
     353             :     int         setno;          /* grouping set */
     354             :     int         used_bits;      /* number of bits of hash already used */
     355             :     LogicalTape *input_tape;    /* input partition tape */
     356             :     int64       input_tuples;   /* number of tuples in this batch */
     357             :     double      input_card;     /* estimated group cardinality */
     358             : } HashAggBatch;
     359             : 
     360             : /* used to find referenced colnos */
     361             : typedef struct FindColsContext
     362             : {
     363             :     bool        is_aggref;      /* is under an aggref */
     364             :     Bitmapset  *aggregated;     /* column references under an aggref */
     365             :     Bitmapset  *unaggregated;   /* other column references */
     366             : } FindColsContext;
     367             : 
     368             : static void select_current_set(AggState *aggstate, int setno, bool is_hash);
     369             : static void initialize_phase(AggState *aggstate, int newphase);
     370             : static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
     371             : static void initialize_aggregates(AggState *aggstate,
     372             :                                   AggStatePerGroup *pergroups,
     373             :                                   int numReset);
     374             : static void advance_transition_function(AggState *aggstate,
     375             :                                         AggStatePerTrans pertrans,
     376             :                                         AggStatePerGroup pergroupstate);
     377             : static void advance_aggregates(AggState *aggstate);
     378             : static void process_ordered_aggregate_single(AggState *aggstate,
     379             :                                              AggStatePerTrans pertrans,
     380             :                                              AggStatePerGroup pergroupstate);
     381             : static void process_ordered_aggregate_multi(AggState *aggstate,
     382             :                                             AggStatePerTrans pertrans,
     383             :                                             AggStatePerGroup pergroupstate);
     384             : static void finalize_aggregate(AggState *aggstate,
     385             :                                AggStatePerAgg peragg,
     386             :                                AggStatePerGroup pergroupstate,
     387             :                                Datum *resultVal, bool *resultIsNull);
     388             : static void finalize_partialaggregate(AggState *aggstate,
     389             :                                       AggStatePerAgg peragg,
     390             :                                       AggStatePerGroup pergroupstate,
     391             :                                       Datum *resultVal, bool *resultIsNull);
     392             : static inline void prepare_hash_slot(AggStatePerHash perhash,
     393             :                                      TupleTableSlot *inputslot,
     394             :                                      TupleTableSlot *hashslot);
     395             : static void prepare_projection_slot(AggState *aggstate,
     396             :                                     TupleTableSlot *slot,
     397             :                                     int currentSet);
     398             : static void finalize_aggregates(AggState *aggstate,
     399             :                                 AggStatePerAgg peraggs,
     400             :                                 AggStatePerGroup pergroup);
     401             : static TupleTableSlot *project_aggregates(AggState *aggstate);
     402             : static void find_cols(AggState *aggstate, Bitmapset **aggregated,
     403             :                       Bitmapset **unaggregated);
     404             : static bool find_cols_walker(Node *node, FindColsContext *context);
     405             : static void build_hash_tables(AggState *aggstate);
     406             : static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
     407             : static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
     408             :                                           bool nullcheck);
     409             : static void hash_create_memory(AggState *aggstate);
     410             : static long hash_choose_num_buckets(double hashentrysize,
     411             :                                     long ngroups, Size memory);
     412             : static int  hash_choose_num_partitions(double input_groups,
     413             :                                        double hashentrysize,
     414             :                                        int used_bits,
     415             :                                        int *log2_npartitions);
     416             : static void initialize_hash_entry(AggState *aggstate,
     417             :                                   TupleHashTable hashtable,
     418             :                                   TupleHashEntry entry);
     419             : static void lookup_hash_entries(AggState *aggstate);
     420             : static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
     421             : static void agg_fill_hash_table(AggState *aggstate);
     422             : static bool agg_refill_hash_table(AggState *aggstate);
     423             : static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
     424             : static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
     425             : static void hash_agg_check_limits(AggState *aggstate);
     426             : static void hash_agg_enter_spill_mode(AggState *aggstate);
     427             : static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
     428             :                                     int npartitions);
     429             : static void hashagg_finish_initial_spills(AggState *aggstate);
     430             : static void hashagg_reset_spill_state(AggState *aggstate);
     431             : static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
     432             :                                        int64 input_tuples, double input_card,
     433             :                                        int used_bits);
     434             : static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
     435             : static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset,
     436             :                                int used_bits, double input_groups,
     437             :                                double hashentrysize);
     438             : static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
     439             :                                 TupleTableSlot *inputslot, uint32 hash);
     440             : static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
     441             :                                  int setno);
     442             : static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
     443             : static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
     444             :                                       AggState *aggstate, EState *estate,
     445             :                                       Aggref *aggref, Oid transfn_oid,
     446             :                                       Oid aggtranstype, Oid aggserialfn,
     447             :                                       Oid aggdeserialfn, Datum initValue,
     448             :                                       bool initValueIsNull, Oid *inputTypes,
     449             :                                       int numArguments);
     450             : 
     451             : 
     452             : /*
     453             :  * Select the current grouping set; affects current_set and
     454             :  * curaggcontext.
     455             :  */
     456             : static void
     457     7128130 : select_current_set(AggState *aggstate, int setno, bool is_hash)
     458             : {
     459             :     /*
     460             :      * When changing this, also adapt ExecAggPlainTransByVal() and
     461             :      * ExecAggPlainTransByRef().
     462             :      */
     463     7128130 :     if (is_hash)
     464     6455334 :         aggstate->curaggcontext = aggstate->hashcontext;
     465             :     else
     466      672796 :         aggstate->curaggcontext = aggstate->aggcontexts[setno];
     467             : 
     468     7128130 :     aggstate->current_set = setno;
     469     7128130 : }
     470             : 
     471             : /*
     472             :  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
     473             :  * current_phase + 1. Juggle the tuplesorts accordingly.
     474             :  *
     475             :  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
     476             :  * case, so when entering phase 0, all we need to do is drop open sorts.
     477             :  */
     478             : static void
     479       88002 : initialize_phase(AggState *aggstate, int newphase)
     480             : {
     481             :     Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
     482             : 
     483             :     /*
     484             :      * Whatever the previous state, we're now done with whatever input
     485             :      * tuplesort was in use.
     486             :      */
     487       88002 :     if (aggstate->sort_in)
     488             :     {
     489          42 :         tuplesort_end(aggstate->sort_in);
     490          42 :         aggstate->sort_in = NULL;
     491             :     }
     492             : 
     493       88002 :     if (newphase <= 1)
     494             :     {
     495             :         /*
     496             :          * Discard any existing output tuplesort.
     497             :          */
     498       87804 :         if (aggstate->sort_out)
     499             :         {
     500           6 :             tuplesort_end(aggstate->sort_out);
     501           6 :             aggstate->sort_out = NULL;
     502             :         }
     503             :     }
     504             :     else
     505             :     {
     506             :         /*
     507             :          * The old output tuplesort becomes the new input one, and this is the
     508             :          * right time to actually sort it.
     509             :          */
     510         198 :         aggstate->sort_in = aggstate->sort_out;
     511         198 :         aggstate->sort_out = NULL;
     512             :         Assert(aggstate->sort_in);
     513         198 :         tuplesort_performsort(aggstate->sort_in);
     514             :     }
     515             : 
     516             :     /*
     517             :      * If this isn't the last phase, we need to sort appropriately for the
     518             :      * next phase in sequence.
     519             :      */
     520       88002 :     if (newphase > 0 && newphase < aggstate->numphases - 1)
     521             :     {
     522         246 :         Sort       *sortnode = aggstate->phases[newphase + 1].sortnode;
     523         246 :         PlanState  *outerNode = outerPlanState(aggstate);
     524         246 :         TupleDesc   tupDesc = ExecGetResultType(outerNode);
     525             : 
     526         246 :         aggstate->sort_out = tuplesort_begin_heap(tupDesc,
     527             :                                                   sortnode->numCols,
     528             :                                                   sortnode->sortColIdx,
     529             :                                                   sortnode->sortOperators,
     530             :                                                   sortnode->collations,
     531             :                                                   sortnode->nullsFirst,
     532             :                                                   work_mem,
     533             :                                                   NULL, TUPLESORT_NONE);
     534             :     }
     535             : 
     536       88002 :     aggstate->current_phase = newphase;
     537       88002 :     aggstate->phase = &aggstate->phases[newphase];
     538       88002 : }
     539             : 
     540             : /*
     541             :  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
     542             :  * populated by the previous phase.  Copy it to the sorter for the next phase
     543             :  * if any.
     544             :  *
     545             :  * Callers cannot rely on memory for tuple in returned slot remaining valid
     546             :  * past any subsequently fetched tuple.
     547             :  */
     548             : static TupleTableSlot *
     549    26994672 : fetch_input_tuple(AggState *aggstate)
     550             : {
     551             :     TupleTableSlot *slot;
     552             : 
     553    26994672 :     if (aggstate->sort_in)
     554             :     {
     555             :         /* make sure we check for interrupts in either path through here */
     556      294894 :         CHECK_FOR_INTERRUPTS();
     557      294894 :         if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
     558             :                                     aggstate->sort_slot, NULL))
     559         198 :             return NULL;
     560      294696 :         slot = aggstate->sort_slot;
     561             :     }
     562             :     else
     563    26699778 :         slot = ExecProcNode(outerPlanState(aggstate));
     564             : 
     565    26994456 :     if (!TupIsNull(slot) && aggstate->sort_out)
     566      294696 :         tuplesort_puttupleslot(aggstate->sort_out, slot);
     567             : 
     568    26994456 :     return slot;
     569             : }
     570             : 
     571             : /*
     572             :  * (Re)Initialize an individual aggregate.
     573             :  *
     574             :  * This function handles only one grouping set, already set in
     575             :  * aggstate->current_set.
     576             :  *
     577             :  * When called, CurrentMemoryContext should be the per-query context.
     578             :  */
     579             : static void
     580     1124416 : initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
     581             :                      AggStatePerGroup pergroupstate)
     582             : {
     583             :     /*
     584             :      * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
     585             :      */
     586     1124416 :     if (pertrans->aggsortrequired)
     587             :     {
     588             :         /*
     589             :          * In case of rescan, maybe there could be an uncompleted sort
     590             :          * operation?  Clean it up if so.
     591             :          */
     592       53828 :         if (pertrans->sortstates[aggstate->current_set])
     593           0 :             tuplesort_end(pertrans->sortstates[aggstate->current_set]);
     594             : 
     595             : 
     596             :         /*
     597             :          * We use a plain Datum sorter when there's a single input column;
     598             :          * otherwise sort the full tuple.  (See comments for
     599             :          * process_ordered_aggregate_single.)
     600             :          */
     601       53828 :         if (pertrans->numInputs == 1)
     602             :         {
     603       53756 :             Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
     604             : 
     605       53756 :             pertrans->sortstates[aggstate->current_set] =
     606       53756 :                 tuplesort_begin_datum(attr->atttypid,
     607       53756 :                                       pertrans->sortOperators[0],
     608       53756 :                                       pertrans->sortCollations[0],
     609       53756 :                                       pertrans->sortNullsFirst[0],
     610             :                                       work_mem, NULL, TUPLESORT_NONE);
     611             :         }
     612             :         else
     613          72 :             pertrans->sortstates[aggstate->current_set] =
     614          72 :                 tuplesort_begin_heap(pertrans->sortdesc,
     615             :                                      pertrans->numSortCols,
     616             :                                      pertrans->sortColIdx,
     617             :                                      pertrans->sortOperators,
     618             :                                      pertrans->sortCollations,
     619             :                                      pertrans->sortNullsFirst,
     620             :                                      work_mem, NULL, TUPLESORT_NONE);
     621             :     }
     622             : 
     623             :     /*
     624             :      * (Re)set transValue to the initial value.
     625             :      *
     626             :      * Note that when the initial value is pass-by-ref, we must copy it (into
     627             :      * the aggcontext) since we will pfree the transValue later.
     628             :      */
     629     1124416 :     if (pertrans->initValueIsNull)
     630      592216 :         pergroupstate->transValue = pertrans->initValue;
     631             :     else
     632             :     {
     633             :         MemoryContext oldContext;
     634             : 
     635      532200 :         oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
     636     1064400 :         pergroupstate->transValue = datumCopy(pertrans->initValue,
     637      532200 :                                               pertrans->transtypeByVal,
     638      532200 :                                               pertrans->transtypeLen);
     639      532200 :         MemoryContextSwitchTo(oldContext);
     640             :     }
     641     1124416 :     pergroupstate->transValueIsNull = pertrans->initValueIsNull;
     642             : 
     643             :     /*
     644             :      * If the initial value for the transition state doesn't exist in the
     645             :      * pg_aggregate table then we will let the first non-NULL value returned
     646             :      * from the outer procNode become the initial value. (This is useful for
     647             :      * aggregates like max() and min().) The noTransValue flag signals that we
     648             :      * still need to do this.
     649             :      */
     650     1124416 :     pergroupstate->noTransValue = pertrans->initValueIsNull;
     651     1124416 : }
     652             : 
     653             : /*
     654             :  * Initialize all aggregate transition states for a new group of input values.
     655             :  *
     656             :  * If there are multiple grouping sets, we initialize only the first numReset
     657             :  * of them (the grouping sets are ordered so that the most specific one, which
     658             :  * is reset most often, is first). As a convenience, if numReset is 0, we
     659             :  * reinitialize all sets.
     660             :  *
     661             :  * NB: This cannot be used for hash aggregates, as for those the grouping set
     662             :  * number has to be specified from further up.
     663             :  *
     664             :  * When called, CurrentMemoryContext should be the per-query context.
     665             :  */
     666             : static void
     667      300964 : initialize_aggregates(AggState *aggstate,
     668             :                       AggStatePerGroup *pergroups,
     669             :                       int numReset)
     670             : {
     671             :     int         transno;
     672      300964 :     int         numGroupingSets = Max(aggstate->phase->numsets, 1);
     673      300964 :     int         setno = 0;
     674      300964 :     int         numTrans = aggstate->numtrans;
     675      300964 :     AggStatePerTrans transstates = aggstate->pertrans;
     676             : 
     677      300964 :     if (numReset == 0)
     678           0 :         numReset = numGroupingSets;
     679             : 
     680      616090 :     for (setno = 0; setno < numReset; setno++)
     681             :     {
     682      315126 :         AggStatePerGroup pergroup = pergroups[setno];
     683             : 
     684      315126 :         select_current_set(aggstate, setno, false);
     685             : 
     686      982244 :         for (transno = 0; transno < numTrans; transno++)
     687             :         {
     688      667118 :             AggStatePerTrans pertrans = &transstates[transno];
     689      667118 :             AggStatePerGroup pergroupstate = &pergroup[transno];
     690             : 
     691      667118 :             initialize_aggregate(aggstate, pertrans, pergroupstate);
     692             :         }
     693             :     }
     694      300964 : }
     695             : 
     696             : /*
     697             :  * Given new input value(s), advance the transition function of one aggregate
     698             :  * state within one grouping set only (already set in aggstate->current_set)
     699             :  *
     700             :  * The new values (and null flags) have been preloaded into argument positions
     701             :  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
     702             :  * pass to the transition function.  We also expect that the static fields of
     703             :  * the fcinfo are already initialized; that was done by ExecInitAgg().
     704             :  *
     705             :  * It doesn't matter which memory context this is called in.
     706             :  */
     707             : static void
     708      724222 : advance_transition_function(AggState *aggstate,
     709             :                             AggStatePerTrans pertrans,
     710             :                             AggStatePerGroup pergroupstate)
     711             : {
     712      724222 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     713             :     MemoryContext oldContext;
     714             :     Datum       newVal;
     715             : 
     716      724222 :     if (pertrans->transfn.fn_strict)
     717             :     {
     718             :         /*
     719             :          * For a strict transfn, nothing happens when there's a NULL input; we
     720             :          * just keep the prior transValue.
     721             :          */
     722      225000 :         int         numTransInputs = pertrans->numTransInputs;
     723             :         int         i;
     724             : 
     725      450000 :         for (i = 1; i <= numTransInputs; i++)
     726             :         {
     727      225000 :             if (fcinfo->args[i].isnull)
     728           0 :                 return;
     729             :         }
     730      225000 :         if (pergroupstate->noTransValue)
     731             :         {
     732             :             /*
     733             :              * transValue has not been initialized. This is the first non-NULL
     734             :              * input value. We use it as the initial value for transValue. (We
     735             :              * already checked that the agg's input type is binary-compatible
     736             :              * with its transtype, so straight copy here is OK.)
     737             :              *
     738             :              * We must copy the datum into aggcontext if it is pass-by-ref. We
     739             :              * do not need to pfree the old transValue, since it's NULL.
     740             :              */
     741           0 :             oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
     742           0 :             pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
     743           0 :                                                   pertrans->transtypeByVal,
     744           0 :                                                   pertrans->transtypeLen);
     745           0 :             pergroupstate->transValueIsNull = false;
     746           0 :             pergroupstate->noTransValue = false;
     747           0 :             MemoryContextSwitchTo(oldContext);
     748           0 :             return;
     749             :         }
     750      225000 :         if (pergroupstate->transValueIsNull)
     751             :         {
     752             :             /*
     753             :              * Don't call a strict function with NULL inputs.  Note it is
     754             :              * possible to get here despite the above tests, if the transfn is
     755             :              * strict *and* returned a NULL on a prior cycle. If that happens
     756             :              * we will propagate the NULL all the way to the end.
     757             :              */
     758           0 :             return;
     759             :         }
     760             :     }
     761             : 
     762             :     /* We run the transition functions in per-input-tuple memory context */
     763      724222 :     oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
     764             : 
     765             :     /* set up aggstate->curpertrans for AggGetAggref() */
     766      724222 :     aggstate->curpertrans = pertrans;
     767             : 
     768             :     /*
     769             :      * OK to call the transition function
     770             :      */
     771      724222 :     fcinfo->args[0].value = pergroupstate->transValue;
     772      724222 :     fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
     773      724222 :     fcinfo->isnull = false;      /* just in case transfn doesn't set it */
     774             : 
     775      724222 :     newVal = FunctionCallInvoke(fcinfo);
     776             : 
     777      724222 :     aggstate->curpertrans = NULL;
     778             : 
     779             :     /*
     780             :      * If pass-by-ref datatype, must copy the new value into aggcontext and
     781             :      * free the prior transValue.  But if transfn returned a pointer to its
     782             :      * first input, we don't need to do anything.
     783             :      *
     784             :      * It's safe to compare newVal with pergroup->transValue without regard
     785             :      * for either being NULL, because ExecAggCopyTransValue takes care to set
     786             :      * transValue to 0 when NULL. Otherwise we could end up accidentally not
     787             :      * reparenting, when the transValue has the same numerical value as
     788             :      * newValue, despite being NULL.  This is a somewhat hot path, making it
     789             :      * undesirable to instead solve this with another branch for the common
     790             :      * case of the transition function returning its (modified) input
     791             :      * argument.
     792             :      */
     793      724222 :     if (!pertrans->transtypeByVal &&
     794           0 :         DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
     795           0 :         newVal = ExecAggCopyTransValue(aggstate, pertrans,
     796           0 :                                        newVal, fcinfo->isnull,
     797             :                                        pergroupstate->transValue,
     798           0 :                                        pergroupstate->transValueIsNull);
     799             : 
     800      724222 :     pergroupstate->transValue = newVal;
     801      724222 :     pergroupstate->transValueIsNull = fcinfo->isnull;
     802             : 
     803      724222 :     MemoryContextSwitchTo(oldContext);
     804             : }
     805             : 
     806             : /*
     807             :  * Advance each aggregate transition state for one input tuple.  The input
     808             :  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
     809             :  * accessible to ExecEvalExpr.
     810             :  *
     811             :  * We have two sets of transition states to handle: one for sorted aggregation
     812             :  * and one for hashed; we do them both here, to avoid multiple evaluation of
     813             :  * the inputs.
     814             :  *
     815             :  * When called, CurrentMemoryContext should be the per-query context.
     816             :  */
     817             : static void
     818    27677146 : advance_aggregates(AggState *aggstate)
     819             : {
     820    27677146 :     ExecEvalExprNoReturnSwitchContext(aggstate->phase->evaltrans,
     821             :                                       aggstate->tmpcontext);
     822    27677068 : }
     823             : 
     824             : /*
     825             :  * Run the transition function for a DISTINCT or ORDER BY aggregate
     826             :  * with only one input.  This is called after we have completed
     827             :  * entering all the input values into the sort object.  We complete the
     828             :  * sort, read out the values in sorted order, and run the transition
     829             :  * function on each value (applying DISTINCT if appropriate).
     830             :  *
     831             :  * Note that the strictness of the transition function was checked when
     832             :  * entering the values into the sort, so we don't check it again here;
     833             :  * we just apply standard SQL DISTINCT logic.
     834             :  *
     835             :  * The one-input case is handled separately from the multi-input case
     836             :  * for performance reasons: for single by-value inputs, such as the
     837             :  * common case of count(distinct id), the tuplesort_getdatum code path
     838             :  * is around 300% faster.  (The speedup for by-reference types is less
     839             :  * but still noticeable.)
     840             :  *
     841             :  * This function handles only one grouping set (already set in
     842             :  * aggstate->current_set).
     843             :  *
     844             :  * When called, CurrentMemoryContext should be the per-query context.
     845             :  */
     846             : static void
     847       53756 : process_ordered_aggregate_single(AggState *aggstate,
     848             :                                  AggStatePerTrans pertrans,
     849             :                                  AggStatePerGroup pergroupstate)
     850             : {
     851       53756 :     Datum       oldVal = (Datum) 0;
     852       53756 :     bool        oldIsNull = true;
     853       53756 :     bool        haveOldVal = false;
     854       53756 :     MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
     855             :     MemoryContext oldContext;
     856       53756 :     bool        isDistinct = (pertrans->numDistinctCols > 0);
     857       53756 :     Datum       newAbbrevVal = (Datum) 0;
     858       53756 :     Datum       oldAbbrevVal = (Datum) 0;
     859       53756 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     860             :     Datum      *newVal;
     861             :     bool       *isNull;
     862             : 
     863             :     Assert(pertrans->numDistinctCols < 2);
     864             : 
     865       53756 :     tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
     866             : 
     867             :     /* Load the column into argument 1 (arg 0 will be transition value) */
     868       53756 :     newVal = &fcinfo->args[1].value;
     869       53756 :     isNull = &fcinfo->args[1].isnull;
     870             : 
     871             :     /*
     872             :      * Note: if input type is pass-by-ref, the datums returned by the sort are
     873             :      * freshly palloc'd in the per-query context, so we must be careful to
     874             :      * pfree them when they are no longer needed.
     875             :      */
     876             : 
     877      898140 :     while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
     878             :                               true, false, newVal, isNull, &newAbbrevVal))
     879             :     {
     880             :         /*
     881             :          * Clear and select the working context for evaluation of the equality
     882             :          * function and transition function.
     883             :          */
     884      844384 :         MemoryContextReset(workcontext);
     885      844384 :         oldContext = MemoryContextSwitchTo(workcontext);
     886             : 
     887             :         /*
     888             :          * If DISTINCT mode, and not distinct from prior, skip it.
     889             :          */
     890      844384 :         if (isDistinct &&
     891      310322 :             haveOldVal &&
     892           0 :             ((oldIsNull && *isNull) ||
     893      310322 :              (!oldIsNull && !*isNull &&
     894      605684 :               oldAbbrevVal == newAbbrevVal &&
     895      295362 :               DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
     896             :                                              pertrans->aggCollation,
     897             :                                              oldVal, *newVal)))))
     898             :         {
     899      120342 :             MemoryContextSwitchTo(oldContext);
     900      120342 :             continue;
     901             :         }
     902             :         else
     903             :         {
     904      724042 :             advance_transition_function(aggstate, pertrans, pergroupstate);
     905             : 
     906      724042 :             MemoryContextSwitchTo(oldContext);
     907             : 
     908             :             /*
     909             :              * Forget the old value, if any, and remember the new one for
     910             :              * subsequent equality checks.
     911             :              */
     912      724042 :             if (!pertrans->inputtypeByVal)
     913             :             {
     914      525288 :                 if (!oldIsNull)
     915      525108 :                     pfree(DatumGetPointer(oldVal));
     916      525288 :                 if (!*isNull)
     917      525228 :                     oldVal = datumCopy(*newVal, pertrans->inputtypeByVal,
     918      525228 :                                        pertrans->inputtypeLen);
     919             :             }
     920             :             else
     921      198754 :                 oldVal = *newVal;
     922      724042 :             oldAbbrevVal = newAbbrevVal;
     923      724042 :             oldIsNull = *isNull;
     924      724042 :             haveOldVal = true;
     925             :         }
     926             :     }
     927             : 
     928       53756 :     if (!oldIsNull && !pertrans->inputtypeByVal)
     929         120 :         pfree(DatumGetPointer(oldVal));
     930             : 
     931       53756 :     tuplesort_end(pertrans->sortstates[aggstate->current_set]);
     932       53756 :     pertrans->sortstates[aggstate->current_set] = NULL;
     933       53756 : }
     934             : 
     935             : /*
     936             :  * Run the transition function for a DISTINCT or ORDER BY aggregate
     937             :  * with more than one input.  This is called after we have completed
     938             :  * entering all the input values into the sort object.  We complete the
     939             :  * sort, read out the values in sorted order, and run the transition
     940             :  * function on each value (applying DISTINCT if appropriate).
     941             :  *
     942             :  * This function handles only one grouping set (already set in
     943             :  * aggstate->current_set).
     944             :  *
     945             :  * When called, CurrentMemoryContext should be the per-query context.
     946             :  */
     947             : static void
     948          72 : process_ordered_aggregate_multi(AggState *aggstate,
     949             :                                 AggStatePerTrans pertrans,
     950             :                                 AggStatePerGroup pergroupstate)
     951             : {
     952          72 :     ExprContext *tmpcontext = aggstate->tmpcontext;
     953          72 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     954          72 :     TupleTableSlot *slot1 = pertrans->sortslot;
     955          72 :     TupleTableSlot *slot2 = pertrans->uniqslot;
     956          72 :     int         numTransInputs = pertrans->numTransInputs;
     957          72 :     int         numDistinctCols = pertrans->numDistinctCols;
     958          72 :     Datum       newAbbrevVal = (Datum) 0;
     959          72 :     Datum       oldAbbrevVal = (Datum) 0;
     960          72 :     bool        haveOldValue = false;
     961          72 :     TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
     962             :     int         i;
     963             : 
     964          72 :     tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
     965             : 
     966          72 :     ExecClearTuple(slot1);
     967          72 :     if (slot2)
     968           0 :         ExecClearTuple(slot2);
     969             : 
     970         252 :     while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
     971             :                                   true, true, slot1, &newAbbrevVal))
     972             :     {
     973         180 :         CHECK_FOR_INTERRUPTS();
     974             : 
     975         180 :         tmpcontext->ecxt_outertuple = slot1;
     976         180 :         tmpcontext->ecxt_innertuple = slot2;
     977             : 
     978         180 :         if (numDistinctCols == 0 ||
     979           0 :             !haveOldValue ||
     980           0 :             newAbbrevVal != oldAbbrevVal ||
     981           0 :             !ExecQual(pertrans->equalfnMulti, tmpcontext))
     982             :         {
     983             :             /*
     984             :              * Extract the first numTransInputs columns as datums to pass to
     985             :              * the transfn.
     986             :              */
     987         180 :             slot_getsomeattrs(slot1, numTransInputs);
     988             : 
     989             :             /* Load values into fcinfo */
     990             :             /* Start from 1, since the 0th arg will be the transition value */
     991         540 :             for (i = 0; i < numTransInputs; i++)
     992             :             {
     993         360 :                 fcinfo->args[i + 1].value = slot1->tts_values[i];
     994         360 :                 fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
     995             :             }
     996             : 
     997         180 :             advance_transition_function(aggstate, pertrans, pergroupstate);
     998             : 
     999         180 :             if (numDistinctCols > 0)
    1000             :             {
    1001             :                 /* swap the slot pointers to retain the current tuple */
    1002           0 :                 TupleTableSlot *tmpslot = slot2;
    1003             : 
    1004           0 :                 slot2 = slot1;
    1005           0 :                 slot1 = tmpslot;
    1006             :                 /* avoid ExecQual() calls by reusing abbreviated keys */
    1007           0 :                 oldAbbrevVal = newAbbrevVal;
    1008           0 :                 haveOldValue = true;
    1009             :             }
    1010             :         }
    1011             : 
    1012             :         /* Reset context each time */
    1013         180 :         ResetExprContext(tmpcontext);
    1014             : 
    1015         180 :         ExecClearTuple(slot1);
    1016             :     }
    1017             : 
    1018          72 :     if (slot2)
    1019           0 :         ExecClearTuple(slot2);
    1020             : 
    1021          72 :     tuplesort_end(pertrans->sortstates[aggstate->current_set]);
    1022          72 :     pertrans->sortstates[aggstate->current_set] = NULL;
    1023             : 
    1024             :     /* restore previous slot, potentially in use for grouping sets */
    1025          72 :     tmpcontext->ecxt_outertuple = save;
    1026          72 : }
    1027             : 
    1028             : /*
    1029             :  * Compute the final value of one aggregate.
    1030             :  *
    1031             :  * This function handles only one grouping set (already set in
    1032             :  * aggstate->current_set).
    1033             :  *
    1034             :  * The finalfn will be run, and the result delivered, in the
    1035             :  * output-tuple context; caller's CurrentMemoryContext does not matter.
    1036             :  * (But note that in some cases, such as when there is no finalfn, the
    1037             :  * result might be a pointer to or into the agg's transition value.)
    1038             :  *
    1039             :  * The finalfn uses the state as set in the transno.  This also might be
    1040             :  * being used by another aggregate function, so it's important that we do
    1041             :  * nothing destructive here.  Moreover, the aggregate's final value might
    1042             :  * get used in multiple places, so we mustn't return a R/W expanded datum.
    1043             :  */
    1044             : static void
    1045     1116360 : finalize_aggregate(AggState *aggstate,
    1046             :                    AggStatePerAgg peragg,
    1047             :                    AggStatePerGroup pergroupstate,
    1048             :                    Datum *resultVal, bool *resultIsNull)
    1049             : {
    1050     1116360 :     LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
    1051     1116360 :     bool        anynull = false;
    1052             :     MemoryContext oldContext;
    1053             :     int         i;
    1054             :     ListCell   *lc;
    1055     1116360 :     AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
    1056             : 
    1057     1116360 :     oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
    1058             : 
    1059             :     /*
    1060             :      * Evaluate any direct arguments.  We do this even if there's no finalfn
    1061             :      * (which is unlikely anyway), so that side-effects happen as expected.
    1062             :      * The direct arguments go into arg positions 1 and up, leaving position 0
    1063             :      * for the transition state value.
    1064             :      */
    1065     1116360 :     i = 1;
    1066     1117334 :     foreach(lc, peragg->aggdirectargs)
    1067             :     {
    1068         974 :         ExprState  *expr = (ExprState *) lfirst(lc);
    1069             : 
    1070         974 :         fcinfo->args[i].value = ExecEvalExpr(expr,
    1071             :                                              aggstate->ss.ps.ps_ExprContext,
    1072             :                                              &fcinfo->args[i].isnull);
    1073         974 :         anynull |= fcinfo->args[i].isnull;
    1074         974 :         i++;
    1075             :     }
    1076             : 
    1077             :     /*
    1078             :      * Apply the agg's finalfn if one is provided, else return transValue.
    1079             :      */
    1080     1116360 :     if (OidIsValid(peragg->finalfn_oid))
    1081             :     {
    1082      337036 :         int         numFinalArgs = peragg->numFinalArgs;
    1083             : 
    1084             :         /* set up aggstate->curperagg for AggGetAggref() */
    1085      337036 :         aggstate->curperagg = peragg;
    1086             : 
    1087      337036 :         InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
    1088             :                                  numFinalArgs,
    1089             :                                  pertrans->aggCollation,
    1090             :                                  (Node *) aggstate, NULL);
    1091             : 
    1092             :         /* Fill in the transition state value */
    1093      337036 :         fcinfo->args[0].value =
    1094      337036 :             MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1095             :                                        pergroupstate->transValueIsNull,
    1096             :                                        pertrans->transtypeLen);
    1097      337036 :         fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
    1098      337036 :         anynull |= pergroupstate->transValueIsNull;
    1099             : 
    1100             :         /* Fill any remaining argument positions with nulls */
    1101      488532 :         for (; i < numFinalArgs; i++)
    1102             :         {
    1103      151496 :             fcinfo->args[i].value = (Datum) 0;
    1104      151496 :             fcinfo->args[i].isnull = true;
    1105      151496 :             anynull = true;
    1106             :         }
    1107             : 
    1108      337036 :         if (fcinfo->flinfo->fn_strict && anynull)
    1109             :         {
    1110             :             /* don't call a strict function with NULL inputs */
    1111           0 :             *resultVal = (Datum) 0;
    1112           0 :             *resultIsNull = true;
    1113             :         }
    1114             :         else
    1115             :         {
    1116             :             Datum       result;
    1117             : 
    1118      337036 :             result = FunctionCallInvoke(fcinfo);
    1119      337024 :             *resultIsNull = fcinfo->isnull;
    1120      337024 :             *resultVal = MakeExpandedObjectReadOnly(result,
    1121             :                                                     fcinfo->isnull,
    1122             :                                                     peragg->resulttypeLen);
    1123             :         }
    1124      337024 :         aggstate->curperagg = NULL;
    1125             :     }
    1126             :     else
    1127             :     {
    1128      779324 :         *resultVal =
    1129      779324 :             MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1130             :                                        pergroupstate->transValueIsNull,
    1131             :                                        pertrans->transtypeLen);
    1132      779324 :         *resultIsNull = pergroupstate->transValueIsNull;
    1133             :     }
    1134             : 
    1135     1116348 :     MemoryContextSwitchTo(oldContext);
    1136     1116348 : }
    1137             : 
    1138             : /*
    1139             :  * Compute the output value of one partial aggregate.
    1140             :  *
    1141             :  * The serialization function will be run, and the result delivered, in the
    1142             :  * output-tuple context; caller's CurrentMemoryContext does not matter.
    1143             :  */
    1144             : static void
    1145       11660 : finalize_partialaggregate(AggState *aggstate,
    1146             :                           AggStatePerAgg peragg,
    1147             :                           AggStatePerGroup pergroupstate,
    1148             :                           Datum *resultVal, bool *resultIsNull)
    1149             : {
    1150       11660 :     AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
    1151             :     MemoryContext oldContext;
    1152             : 
    1153       11660 :     oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
    1154             : 
    1155             :     /*
    1156             :      * serialfn_oid will be set if we must serialize the transvalue before
    1157             :      * returning it
    1158             :      */
    1159       11660 :     if (OidIsValid(pertrans->serialfn_oid))
    1160             :     {
    1161             :         /* Don't call a strict serialization function with NULL input. */
    1162         426 :         if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
    1163             :         {
    1164         120 :             *resultVal = (Datum) 0;
    1165         120 :             *resultIsNull = true;
    1166             :         }
    1167             :         else
    1168             :         {
    1169         306 :             FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
    1170             :             Datum       result;
    1171             : 
    1172         306 :             fcinfo->args[0].value =
    1173         306 :                 MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1174             :                                            pergroupstate->transValueIsNull,
    1175             :                                            pertrans->transtypeLen);
    1176         306 :             fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
    1177         306 :             fcinfo->isnull = false;
    1178             : 
    1179         306 :             result = FunctionCallInvoke(fcinfo);
    1180         306 :             *resultIsNull = fcinfo->isnull;
    1181         306 :             *resultVal = MakeExpandedObjectReadOnly(result,
    1182             :                                                     fcinfo->isnull,
    1183             :                                                     peragg->resulttypeLen);
    1184             :         }
    1185             :     }
    1186             :     else
    1187             :     {
    1188       11234 :         *resultVal =
    1189       11234 :             MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1190             :                                        pergroupstate->transValueIsNull,
    1191             :                                        pertrans->transtypeLen);
    1192       11234 :         *resultIsNull = pergroupstate->transValueIsNull;
    1193             :     }
    1194             : 
    1195       11660 :     MemoryContextSwitchTo(oldContext);
    1196       11660 : }
    1197             : 
    1198             : /*
    1199             :  * Extract the attributes that make up the grouping key into the
    1200             :  * hashslot. This is necessary to compute the hash or perform a lookup.
    1201             :  */
    1202             : static inline void
    1203     7496412 : prepare_hash_slot(AggStatePerHash perhash,
    1204             :                   TupleTableSlot *inputslot,
    1205             :                   TupleTableSlot *hashslot)
    1206             : {
    1207             :     int         i;
    1208             : 
    1209             :     /* transfer just the needed columns into hashslot */
    1210     7496412 :     slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
    1211     7496412 :     ExecClearTuple(hashslot);
    1212             : 
    1213    17945414 :     for (i = 0; i < perhash->numhashGrpCols; i++)
    1214             :     {
    1215    10449002 :         int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    1216             : 
    1217    10449002 :         hashslot->tts_values[i] = inputslot->tts_values[varNumber];
    1218    10449002 :         hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
    1219             :     }
    1220     7496412 :     ExecStoreVirtualTuple(hashslot);
    1221     7496412 : }
    1222             : 
    1223             : /*
    1224             :  * Prepare to finalize and project based on the specified representative tuple
    1225             :  * slot and grouping set.
    1226             :  *
    1227             :  * In the specified tuple slot, force to null all attributes that should be
    1228             :  * read as null in the context of the current grouping set.  Also stash the
    1229             :  * current group bitmap where GroupingExpr can get at it.
    1230             :  *
    1231             :  * This relies on three conditions:
    1232             :  *
    1233             :  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
    1234             :  * only reference it in evaluations, which will only access individual
    1235             :  * attributes.
    1236             :  *
    1237             :  * 2) No system columns are going to need to be nulled. (If a system column is
    1238             :  * referenced in a group clause, it is actually projected in the outer plan
    1239             :  * tlist.)
    1240             :  *
    1241             :  * 3) Within a given phase, we never need to recover the value of an attribute
    1242             :  * once it has been set to null.
    1243             :  *
    1244             :  * Poking into the slot this way is a bit ugly, but the consensus is that the
    1245             :  * alternative was worse.
    1246             :  */
    1247             : static void
    1248      844162 : prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
    1249             : {
    1250      844162 :     if (aggstate->phase->grouped_cols)
    1251             :     {
    1252      558550 :         Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
    1253             : 
    1254      558550 :         aggstate->grouped_cols = grouped_cols;
    1255             : 
    1256      558550 :         if (TTS_EMPTY(slot))
    1257             :         {
    1258             :             /*
    1259             :              * Force all values to be NULL if working on an empty input tuple
    1260             :              * (i.e. an empty grouping set for which no input rows were
    1261             :              * supplied).
    1262             :              */
    1263          48 :             ExecStoreAllNullTuple(slot);
    1264             :         }
    1265      558502 :         else if (aggstate->all_grouped_cols)
    1266             :         {
    1267             :             ListCell   *lc;
    1268             : 
    1269             :             /* all_grouped_cols is arranged in desc order */
    1270      558454 :             slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
    1271             : 
    1272     1527780 :             foreach(lc, aggstate->all_grouped_cols)
    1273             :             {
    1274      969326 :                 int         attnum = lfirst_int(lc);
    1275             : 
    1276      969326 :                 if (!bms_is_member(attnum, grouped_cols))
    1277       57760 :                     slot->tts_isnull[attnum - 1] = true;
    1278             :             }
    1279             :         }
    1280             :     }
    1281      844162 : }
    1282             : 
    1283             : /*
    1284             :  * Compute the final value of all aggregates for one group.
    1285             :  *
    1286             :  * This function handles only one grouping set at a time, which the caller must
    1287             :  * have selected.  It's also the caller's responsibility to adjust the supplied
    1288             :  * pergroup parameter to point to the current set's transvalues.
    1289             :  *
    1290             :  * Results are stored in the output econtext aggvalues/aggnulls.
    1291             :  */
    1292             : static void
    1293      844162 : finalize_aggregates(AggState *aggstate,
    1294             :                     AggStatePerAgg peraggs,
    1295             :                     AggStatePerGroup pergroup)
    1296             : {
    1297      844162 :     ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    1298      844162 :     Datum      *aggvalues = econtext->ecxt_aggvalues;
    1299      844162 :     bool       *aggnulls = econtext->ecxt_aggnulls;
    1300             :     int         aggno;
    1301             : 
    1302             :     /*
    1303             :      * If there were any DISTINCT and/or ORDER BY aggregates, sort their
    1304             :      * inputs and run the transition functions.
    1305             :      */
    1306     1971924 :     for (int transno = 0; transno < aggstate->numtrans; transno++)
    1307             :     {
    1308     1127762 :         AggStatePerTrans pertrans = &aggstate->pertrans[transno];
    1309             :         AggStatePerGroup pergroupstate;
    1310             : 
    1311     1127762 :         pergroupstate = &pergroup[transno];
    1312             : 
    1313     1127762 :         if (pertrans->aggsortrequired)
    1314             :         {
    1315             :             Assert(aggstate->aggstrategy != AGG_HASHED &&
    1316             :                    aggstate->aggstrategy != AGG_MIXED);
    1317             : 
    1318       53828 :             if (pertrans->numInputs == 1)
    1319       53756 :                 process_ordered_aggregate_single(aggstate,
    1320             :                                                  pertrans,
    1321             :                                                  pergroupstate);
    1322             :             else
    1323          72 :                 process_ordered_aggregate_multi(aggstate,
    1324             :                                                 pertrans,
    1325             :                                                 pergroupstate);
    1326             :         }
    1327     1073934 :         else if (pertrans->numDistinctCols > 0 && pertrans->haslast)
    1328             :         {
    1329       18360 :             pertrans->haslast = false;
    1330             : 
    1331       18360 :             if (pertrans->numDistinctCols == 1)
    1332             :             {
    1333       18264 :                 if (!pertrans->inputtypeByVal && !pertrans->lastisnull)
    1334         262 :                     pfree(DatumGetPointer(pertrans->lastdatum));
    1335             : 
    1336       18264 :                 pertrans->lastisnull = false;
    1337       18264 :                 pertrans->lastdatum = (Datum) 0;
    1338             :             }
    1339             :             else
    1340          96 :                 ExecClearTuple(pertrans->uniqslot);
    1341             :         }
    1342             :     }
    1343             : 
    1344             :     /*
    1345             :      * Run the final functions.
    1346             :      */
    1347     1972170 :     for (aggno = 0; aggno < aggstate->numaggs; aggno++)
    1348             :     {
    1349     1128020 :         AggStatePerAgg peragg = &peraggs[aggno];
    1350     1128020 :         int         transno = peragg->transno;
    1351             :         AggStatePerGroup pergroupstate;
    1352             : 
    1353     1128020 :         pergroupstate = &pergroup[transno];
    1354             : 
    1355     1128020 :         if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
    1356       11660 :             finalize_partialaggregate(aggstate, peragg, pergroupstate,
    1357       11660 :                                       &aggvalues[aggno], &aggnulls[aggno]);
    1358             :         else
    1359     1116360 :             finalize_aggregate(aggstate, peragg, pergroupstate,
    1360     1116360 :                                &aggvalues[aggno], &aggnulls[aggno]);
    1361             :     }
    1362      844150 : }
    1363             : 
    1364             : /*
    1365             :  * Project the result of a group (whose aggs have already been calculated by
    1366             :  * finalize_aggregates). Returns the result slot, or NULL if no row is
    1367             :  * projected (suppressed by qual).
    1368             :  */
    1369             : static TupleTableSlot *
    1370      844150 : project_aggregates(AggState *aggstate)
    1371             : {
    1372      844150 :     ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    1373             : 
    1374             :     /*
    1375             :      * Check the qual (HAVING clause); if the group does not match, ignore it.
    1376             :      */
    1377      844150 :     if (ExecQual(aggstate->ss.ps.qual, econtext))
    1378             :     {
    1379             :         /*
    1380             :          * Form and return projection tuple using the aggregate results and
    1381             :          * the representative input tuple.
    1382             :          */
    1383      737696 :         return ExecProject(aggstate->ss.ps.ps_ProjInfo);
    1384             :     }
    1385             :     else
    1386      106454 :         InstrCountFiltered1(aggstate, 1);
    1387             : 
    1388      106454 :     return NULL;
    1389             : }
    1390             : 
    1391             : /*
    1392             :  * Find input-tuple columns that are needed, dividing them into
    1393             :  * aggregated and unaggregated sets.
    1394             :  */
    1395             : static void
    1396        5726 : find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
    1397             : {
    1398        5726 :     Agg        *agg = (Agg *) aggstate->ss.ps.plan;
    1399             :     FindColsContext context;
    1400             : 
    1401        5726 :     context.is_aggref = false;
    1402        5726 :     context.aggregated = NULL;
    1403        5726 :     context.unaggregated = NULL;
    1404             : 
    1405             :     /* Examine tlist and quals */
    1406        5726 :     (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
    1407        5726 :     (void) find_cols_walker((Node *) agg->plan.qual, &context);
    1408             : 
    1409             :     /* In some cases, grouping columns will not appear in the tlist */
    1410       14516 :     for (int i = 0; i < agg->numCols; i++)
    1411        8790 :         context.unaggregated = bms_add_member(context.unaggregated,
    1412        8790 :                                               agg->grpColIdx[i]);
    1413             : 
    1414        5726 :     *aggregated = context.aggregated;
    1415        5726 :     *unaggregated = context.unaggregated;
    1416        5726 : }
    1417             : 
    1418             : static bool
    1419       67502 : find_cols_walker(Node *node, FindColsContext *context)
    1420             : {
    1421       67502 :     if (node == NULL)
    1422       12008 :         return false;
    1423       55494 :     if (IsA(node, Var))
    1424             :     {
    1425       14796 :         Var        *var = (Var *) node;
    1426             : 
    1427             :         /* setrefs.c should have set the varno to OUTER_VAR */
    1428             :         Assert(var->varno == OUTER_VAR);
    1429             :         Assert(var->varlevelsup == 0);
    1430       14796 :         if (context->is_aggref)
    1431        4580 :             context->aggregated = bms_add_member(context->aggregated,
    1432        4580 :                                                  var->varattno);
    1433             :         else
    1434       10216 :             context->unaggregated = bms_add_member(context->unaggregated,
    1435       10216 :                                                    var->varattno);
    1436       14796 :         return false;
    1437             :     }
    1438       40698 :     if (IsA(node, Aggref))
    1439             :     {
    1440             :         Assert(!context->is_aggref);
    1441        6770 :         context->is_aggref = true;
    1442        6770 :         expression_tree_walker(node, find_cols_walker, context);
    1443        6770 :         context->is_aggref = false;
    1444        6770 :         return false;
    1445             :     }
    1446       33928 :     return expression_tree_walker(node, find_cols_walker, context);
    1447             : }
    1448             : 
    1449             : /*
    1450             :  * (Re-)initialize the hash table(s) to empty.
    1451             :  *
    1452             :  * To implement hashed aggregation, we need a hashtable that stores a
    1453             :  * representative tuple and an array of AggStatePerGroup structs for each
    1454             :  * distinct set of GROUP BY column values.  We compute the hash key from the
    1455             :  * GROUP BY columns.  The per-group data is allocated in initialize_hash_entry(),
    1456             :  * for each entry.
    1457             :  *
    1458             :  * We have a separate hashtable and associated perhash data structure for each
    1459             :  * grouping set for which we're doing hashing.
    1460             :  *
    1461             :  * The contents of the hash tables always live in the hashcontext's per-tuple
    1462             :  * memory context (there is only one of these for all tables together, since
    1463             :  * they are all reset at the same time).
    1464             :  */
    1465             : static void
    1466       15424 : build_hash_tables(AggState *aggstate)
    1467             : {
    1468             :     int         setno;
    1469             : 
    1470       31186 :     for (setno = 0; setno < aggstate->num_hashes; ++setno)
    1471             :     {
    1472       15762 :         AggStatePerHash perhash = &aggstate->perhash[setno];
    1473             :         long        nbuckets;
    1474             :         Size        memory;
    1475             : 
    1476       15762 :         if (perhash->hashtable != NULL)
    1477             :         {
    1478       11094 :             ResetTupleHashTable(perhash->hashtable);
    1479       11094 :             continue;
    1480             :         }
    1481             : 
    1482             :         Assert(perhash->aggnode->numGroups > 0);
    1483             : 
    1484        4668 :         memory = aggstate->hash_mem_limit / aggstate->num_hashes;
    1485             : 
    1486             :         /* choose reasonable number of buckets per hashtable */
    1487        4668 :         nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
    1488        4668 :                                            perhash->aggnode->numGroups,
    1489             :                                            memory);
    1490             : 
    1491             : #ifdef USE_INJECTION_POINTS
    1492        4668 :         if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table"))
    1493             :         {
    1494           0 :             nbuckets = memory / TupleHashEntrySize();
    1495           0 :             INJECTION_POINT_CACHED("hash-aggregate-oversize-table");
    1496             :         }
    1497             : #endif
    1498             : 
    1499        4668 :         build_hash_table(aggstate, setno, nbuckets);
    1500             :     }
    1501             : 
    1502       15424 :     aggstate->hash_ngroups_current = 0;
    1503       15424 : }
    1504             : 
    1505             : /*
    1506             :  * Build a single hashtable for this grouping set.
    1507             :  */
    1508             : static void
    1509        4668 : build_hash_table(AggState *aggstate, int setno, long nbuckets)
    1510             : {
    1511        4668 :     AggStatePerHash perhash = &aggstate->perhash[setno];
    1512        4668 :     MemoryContext metacxt = aggstate->hash_metacxt;
    1513        4668 :     MemoryContext tablecxt = aggstate->hash_tablecxt;
    1514        4668 :     MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
    1515             :     Size        additionalsize;
    1516             : 
    1517             :     Assert(aggstate->aggstrategy == AGG_HASHED ||
    1518             :            aggstate->aggstrategy == AGG_MIXED);
    1519             : 
    1520             :     /*
    1521             :      * Used to make sure initial hash table allocation does not exceed
    1522             :      * hash_mem. Note that the estimate does not include space for
    1523             :      * pass-by-reference transition data values, nor for the representative
    1524             :      * tuple of each group.
    1525             :      */
    1526        4668 :     additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
    1527             : 
    1528        9336 :     perhash->hashtable = BuildTupleHashTable(&aggstate->ss.ps,
    1529        4668 :                                              perhash->hashslot->tts_tupleDescriptor,
    1530        4668 :                                              perhash->hashslot->tts_ops,
    1531             :                                              perhash->numCols,
    1532             :                                              perhash->hashGrpColIdxHash,
    1533        4668 :                                              perhash->eqfuncoids,
    1534             :                                              perhash->hashfunctions,
    1535        4668 :                                              perhash->aggnode->grpCollations,
    1536             :                                              nbuckets,
    1537             :                                              additionalsize,
    1538             :                                              metacxt,
    1539             :                                              tablecxt,
    1540             :                                              tmpcxt,
    1541        4668 :                                              DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
    1542        4668 : }
    1543             : 
    1544             : /*
    1545             :  * Compute columns that actually need to be stored in hashtable entries.  The
    1546             :  * incoming tuples from the child plan node will contain grouping columns,
    1547             :  * other columns referenced in our targetlist and qual, columns used to
    1548             :  * compute the aggregate functions, and perhaps just junk columns we don't use
    1549             :  * at all.  Only columns of the first two types need to be stored in the
    1550             :  * hashtable, and getting rid of the others can make the table entries
    1551             :  * significantly smaller.  The hashtable only contains the relevant columns,
    1552             :  * and is packed/unpacked in lookup_hash_entries() / agg_retrieve_hash_table()
    1553             :  * into the format of the normal input descriptor.
    1554             :  *
    1555             :  * Additional columns, in addition to the columns grouped by, come from two
    1556             :  * sources: Firstly functionally dependent columns that we don't need to group
    1557             :  * by themselves, and secondly ctids for row-marks.
    1558             :  *
    1559             :  * To eliminate duplicates, we build a bitmapset of the needed columns, and
    1560             :  * then build an array of the columns included in the hashtable. We might
    1561             :  * still have duplicates if the passed-in grpColIdx has them, which can happen
    1562             :  * in edge cases from semijoins/distinct; these can't always be removed,
    1563             :  * because it's not certain that the duplicate cols will be using the same
    1564             :  * hash function.
    1565             :  *
    1566             :  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
    1567             :  * the per-query context (unlike the hash table itself).
    1568             :  */
    1569             : static void
    1570        5726 : find_hash_columns(AggState *aggstate)
    1571             : {
    1572             :     Bitmapset  *base_colnos;
    1573             :     Bitmapset  *aggregated_colnos;
    1574        5726 :     TupleDesc   scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
    1575        5726 :     List       *outerTlist = outerPlanState(aggstate)->plan->targetlist;
    1576        5726 :     int         numHashes = aggstate->num_hashes;
    1577        5726 :     EState     *estate = aggstate->ss.ps.state;
    1578             :     int         j;
    1579             : 
    1580             :     /* Find Vars that will be needed in tlist and qual */
    1581        5726 :     find_cols(aggstate, &aggregated_colnos, &base_colnos);
    1582        5726 :     aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
    1583        5726 :     aggstate->max_colno_needed = 0;
    1584        5726 :     aggstate->all_cols_needed = true;
    1585             : 
    1586       23672 :     for (int i = 0; i < scanDesc->natts; i++)
    1587             :     {
    1588       17946 :         int         colno = i + 1;
    1589             : 
    1590       17946 :         if (bms_is_member(colno, aggstate->colnos_needed))
    1591       12278 :             aggstate->max_colno_needed = colno;
    1592             :         else
    1593        5668 :             aggstate->all_cols_needed = false;
    1594             :     }
    1595             : 
    1596       11954 :     for (j = 0; j < numHashes; ++j)
    1597             :     {
    1598        6228 :         AggStatePerHash perhash = &aggstate->perhash[j];
    1599        6228 :         Bitmapset  *colnos = bms_copy(base_colnos);
    1600        6228 :         AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
    1601        6228 :         List       *hashTlist = NIL;
    1602             :         TupleDesc   hashDesc;
    1603             :         int         maxCols;
    1604             :         int         i;
    1605             : 
    1606        6228 :         perhash->largestGrpColIdx = 0;
    1607             : 
    1608             :         /*
    1609             :          * If we're doing grouping sets, then some Vars might be referenced in
    1610             :          * tlist/qual for the benefit of other grouping sets, but not needed
    1611             :          * when hashing; i.e. prepare_projection_slot will null them out, so
    1612             :          * there'd be no point storing them.  Use prepare_projection_slot's
    1613             :          * logic to determine which.
    1614             :          */
    1615        6228 :         if (aggstate->phases[0].grouped_cols)
    1616             :         {
    1617        6228 :             Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
    1618             :             ListCell   *lc;
    1619             : 
    1620       16870 :             foreach(lc, aggstate->all_grouped_cols)
    1621             :             {
    1622       10642 :                 int         attnum = lfirst_int(lc);
    1623             : 
    1624       10642 :                 if (!bms_is_member(attnum, grouped_cols))
    1625        1248 :                     colnos = bms_del_member(colnos, attnum);
    1626             :             }
    1627             :         }
    1628             : 
    1629             :         /*
    1630             :          * Compute maximum number of input columns accounting for possible
    1631             :          * duplications in the grpColIdx array, which can happen in some edge
    1632             :          * cases where HashAggregate was generated as part of a semijoin or a
    1633             :          * DISTINCT.
    1634             :          */
    1635        6228 :         maxCols = bms_num_members(colnos) + perhash->numCols;
    1636             : 
    1637        6228 :         perhash->hashGrpColIdxInput =
    1638        6228 :             palloc(maxCols * sizeof(AttrNumber));
    1639        6228 :         perhash->hashGrpColIdxHash =
    1640        6228 :             palloc(perhash->numCols * sizeof(AttrNumber));
    1641             : 
    1642             :         /* Add all the grouping columns to colnos */
    1643       15628 :         for (i = 0; i < perhash->numCols; i++)
    1644        9400 :             colnos = bms_add_member(colnos, grpColIdx[i]);
    1645             : 
    1646             :         /*
    1647             :          * First build mapping for columns directly hashed. These are the
    1648             :          * first, because they'll be accessed when computing hash values and
    1649             :          * comparing tuples for exact matches. We also build simple mapping
    1650             :          * for execGrouping, so it knows where to find the to-be-hashed /
    1651             :          * compared columns in the input.
    1652             :          */
    1653       15628 :         for (i = 0; i < perhash->numCols; i++)
    1654             :         {
    1655        9400 :             perhash->hashGrpColIdxInput[i] = grpColIdx[i];
    1656        9400 :             perhash->hashGrpColIdxHash[i] = i + 1;
    1657        9400 :             perhash->numhashGrpCols++;
    1658             :             /* delete already mapped columns */
    1659        9400 :             colnos = bms_del_member(colnos, grpColIdx[i]);
    1660             :         }
    1661             : 
    1662             :         /* and add the remaining columns */
    1663        6228 :         i = -1;
    1664        6940 :         while ((i = bms_next_member(colnos, i)) >= 0)
    1665             :         {
    1666         712 :             perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
    1667         712 :             perhash->numhashGrpCols++;
    1668             :         }
    1669             : 
    1670             :         /* and build a tuple descriptor for the hashtable */
    1671       16340 :         for (i = 0; i < perhash->numhashGrpCols; i++)
    1672             :         {
    1673       10112 :             int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    1674             : 
    1675       10112 :             hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
    1676       10112 :             perhash->largestGrpColIdx =
    1677       10112 :                 Max(varNumber + 1, perhash->largestGrpColIdx);
    1678             :         }
    1679             : 
    1680        6228 :         hashDesc = ExecTypeFromTL(hashTlist);
    1681             : 
    1682        6228 :         execTuplesHashPrepare(perhash->numCols,
    1683        6228 :                               perhash->aggnode->grpOperators,
    1684             :                               &perhash->eqfuncoids,
    1685             :                               &perhash->hashfunctions);
    1686        6228 :         perhash->hashslot =
    1687        6228 :             ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
    1688             :                                &TTSOpsMinimalTuple);
    1689             : 
    1690        6228 :         list_free(hashTlist);
    1691        6228 :         bms_free(colnos);
    1692             :     }
    1693             : 
    1694        5726 :     bms_free(base_colnos);
    1695        5726 : }
    1696             : 
    1697             : /*
    1698             :  * Estimate per-hash-table-entry overhead.
    1699             :  */
    1700             : Size
    1701       27136 : hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
    1702             : {
    1703             :     Size        tupleChunkSize;
    1704             :     Size        pergroupChunkSize;
    1705             :     Size        transitionChunkSize;
    1706       27136 :     Size        tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
    1707             :                              tupleWidth);
    1708       27136 :     Size        pergroupSize = numTrans * sizeof(AggStatePerGroupData);
    1709             : 
    1710             :     /*
    1711             :      * Entries use the Bump allocator, so the chunk sizes are the same as the
    1712             :      * requested sizes.
    1713             :      */
    1714       27136 :     tupleChunkSize = MAXALIGN(tupleSize);
    1715       27136 :     pergroupChunkSize = pergroupSize;
    1716             : 
    1717             :     /*
    1718             :      * Transition values use AllocSet, which has a chunk header and also uses
    1719             :      * power-of-two allocations.
    1720             :      */
    1721       27136 :     if (transitionSpace > 0)
    1722        4774 :         transitionChunkSize = CHUNKHDRSZ + pg_nextpower2_size_t(transitionSpace);
    1723             :     else
    1724       22362 :         transitionChunkSize = 0;
    1725             : 
    1726             :     return
    1727       27136 :         TupleHashEntrySize() +
    1728       27136 :         tupleChunkSize +
    1729       27136 :         pergroupChunkSize +
    1730             :         transitionChunkSize;
    1731             : }
    1732             : 
    1733             : /*
    1734             :  * hashagg_recompile_expressions()
    1735             :  *
    1736             :  * Identifies the right phase, compiles the right expression given the
    1737             :  * arguments, and then sets phase->evalfunc to that expression.
    1738             :  *
    1739             :  * Different versions of the compiled expression are needed depending on
    1740             :  * whether hash aggregation has spilled or not, and whether it's reading from
    1741             :  * the outer plan or a tape. Before spilling to disk, the expression reads
    1742             :  * from the outer plan and does not need to perform a NULL check. After
    1743             :  * HashAgg begins to spill, new groups will not be created in the hash table,
    1744             :  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
    1745             :  * pointer check to the expression. Then, when reading spilled data from a
    1746             :  * tape, we change the outer slot type to be a fixed minimal tuple slot.
    1747             :  *
    1748             :  * It would be wasteful to recompile every time, so cache the compiled
    1749             :  * expressions in the AggStatePerPhase, and reuse when appropriate.
    1750             :  */
    1751             : static void
    1752       64436 : hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
    1753             : {
    1754             :     AggStatePerPhase phase;
    1755       64436 :     int         i = minslot ? 1 : 0;
    1756       64436 :     int         j = nullcheck ? 1 : 0;
    1757             : 
    1758             :     Assert(aggstate->aggstrategy == AGG_HASHED ||
    1759             :            aggstate->aggstrategy == AGG_MIXED);
    1760             : 
    1761       64436 :     if (aggstate->aggstrategy == AGG_HASHED)
    1762       11864 :         phase = &aggstate->phases[0];
    1763             :     else                        /* AGG_MIXED */
    1764       52572 :         phase = &aggstate->phases[1];
    1765             : 
    1766       64436 :     if (phase->evaltrans_cache[i][j] == NULL)
    1767             :     {
    1768          88 :         const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
    1769          88 :         bool        outerfixed = aggstate->ss.ps.outeropsfixed;
    1770          88 :         bool        dohash = true;
    1771          88 :         bool        dosort = false;
    1772             : 
    1773             :         /*
    1774             :          * If minslot is true, that means we are processing a spilled batch
    1775             :          * (inside agg_refill_hash_table()), and we must not advance the
    1776             :          * sorted grouping sets.
    1777             :          */
    1778          88 :         if (aggstate->aggstrategy == AGG_MIXED && !minslot)
    1779          12 :             dosort = true;
    1780             : 
    1781             :         /* temporarily change the outerops while compiling the expression */
    1782          88 :         if (minslot)
    1783             :         {
    1784          44 :             aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
    1785          44 :             aggstate->ss.ps.outeropsfixed = true;
    1786             :         }
    1787             : 
    1788          88 :         phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
    1789             :                                                          dosort, dohash,
    1790             :                                                          nullcheck);
    1791             : 
    1792             :         /* change back */
    1793          88 :         aggstate->ss.ps.outerops = outerops;
    1794          88 :         aggstate->ss.ps.outeropsfixed = outerfixed;
    1795             :     }
    1796             : 
    1797       64436 :     phase->evaltrans = phase->evaltrans_cache[i][j];
    1798       64436 : }
    1799             : 
    1800             : /*
    1801             :  * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
    1802             :  * number of partitions we expect to create (if we do spill).
    1803             :  *
    1804             :  * There are two limits: a memory limit, and also an ngroups limit. The
    1805             :  * ngroups limit becomes important when we expect transition values to grow
    1806             :  * substantially larger than the initial value.
    1807             :  */
    1808             : void
    1809       51742 : hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
    1810             :                     Size *mem_limit, uint64 *ngroups_limit,
    1811             :                     int *num_partitions)
    1812             : {
    1813             :     int         npartitions;
    1814             :     Size        partition_mem;
    1815       51742 :     Size        hash_mem_limit = get_hash_memory_limit();
    1816             : 
    1817             :     /* if not expected to spill, use all of hash_mem */
    1818       51742 :     if (input_groups * hashentrysize <= hash_mem_limit)
    1819             :     {
    1820       49312 :         if (num_partitions != NULL)
    1821       24702 :             *num_partitions = 0;
    1822       49312 :         *mem_limit = hash_mem_limit;
    1823       49312 :         *ngroups_limit = hash_mem_limit / hashentrysize;
    1824       49312 :         return;
    1825             :     }
    1826             : 
    1827             :     /*
    1828             :      * Calculate expected memory requirements for spilling, which is the size
    1829             :      * of the buffers needed for all the tapes that need to be open at once.
    1830             :      * Then, subtract that from the memory available for holding hash tables.
    1831             :      */
    1832        2430 :     npartitions = hash_choose_num_partitions(input_groups,
    1833             :                                              hashentrysize,
    1834             :                                              used_bits,
    1835             :                                              NULL);
    1836        2430 :     if (num_partitions != NULL)
    1837          96 :         *num_partitions = npartitions;
    1838             : 
    1839        2430 :     partition_mem =
    1840        2430 :         HASHAGG_READ_BUFFER_SIZE +
    1841             :         HASHAGG_WRITE_BUFFER_SIZE * npartitions;
    1842             : 
    1843             :     /*
    1844             :      * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
    1845             :      * minimum number of partitions, so we aren't going to dramatically exceed
    1846             :      * work mem anyway.
    1847             :      */
    1848        2430 :     if (hash_mem_limit > 4 * partition_mem)
    1849           0 :         *mem_limit = hash_mem_limit - partition_mem;
    1850             :     else
    1851        2430 :         *mem_limit = hash_mem_limit * 0.75;
    1852             : 
    1853        2430 :     if (*mem_limit > hashentrysize)
    1854        2430 :         *ngroups_limit = *mem_limit / hashentrysize;
    1855             :     else
    1856           0 :         *ngroups_limit = 1;
    1857             : }
    1858             : 
    1859             : /*
    1860             :  * hash_agg_check_limits
    1861             :  *
    1862             :  * After adding a new group to the hash table, check whether we need to enter
    1863             :  * spill mode. Allocations may happen without adding new groups (for instance,
    1864             :  * if the transition state size grows), so this check is imperfect.
    1865             :  */
    1866             : static void
    1867      527230 : hash_agg_check_limits(AggState *aggstate)
    1868             : {
    1869      527230 :     uint64      ngroups = aggstate->hash_ngroups_current;
    1870      527230 :     Size        meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
    1871             :                                                      true);
    1872      527230 :     Size        entry_mem = MemoryContextMemAllocated(aggstate->hash_tablecxt,
    1873             :                                                       true);
    1874      527230 :     Size        tval_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
    1875             :                                                      true);
    1876      527230 :     Size        total_mem = meta_mem + entry_mem + tval_mem;
    1877      527230 :     bool        do_spill = false;
    1878             : 
    1879             : #ifdef USE_INJECTION_POINTS
    1880      527230 :     if (ngroups >= 1000)
    1881             :     {
    1882       95662 :         if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000"))
    1883             :         {
    1884          10 :             do_spill = true;
    1885          10 :             INJECTION_POINT_CACHED("hash-aggregate-spill-1000");
    1886             :         }
    1887             :     }
    1888             : #endif
    1889             : 
    1890             :     /*
    1891             :      * Don't spill unless there's at least one group in the hash table so we
    1892             :      * can be sure to make progress even in edge cases.
    1893             :      */
    1894      527230 :     if (aggstate->hash_ngroups_current > 0 &&
    1895      527230 :         (total_mem > aggstate->hash_mem_limit ||
    1896      500818 :          ngroups > aggstate->hash_ngroups_limit))
    1897             :     {
    1898       26448 :         do_spill = true;
    1899             :     }
    1900             : 
    1901      527230 :     if (do_spill)
    1902       26458 :         hash_agg_enter_spill_mode(aggstate);
    1903      527230 : }
    1904             : 
    1905             : /*
    1906             :  * Enter "spill mode", meaning that no new groups are added to any of the hash
    1907             :  * tables. Tuples that would create a new group are instead spilled, and
    1908             :  * processed later.
    1909             :  */
    1910             : static void
    1911       26458 : hash_agg_enter_spill_mode(AggState *aggstate)
    1912             : {
    1913       26458 :     INJECTION_POINT("hash-aggregate-enter-spill-mode");
    1914       26458 :     aggstate->hash_spill_mode = true;
    1915       26458 :     hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
    1916             : 
    1917       26458 :     if (!aggstate->hash_ever_spilled)
    1918             :     {
    1919             :         Assert(aggstate->hash_tapeset == NULL);
    1920             :         Assert(aggstate->hash_spills == NULL);
    1921             : 
    1922          62 :         aggstate->hash_ever_spilled = true;
    1923             : 
    1924          62 :         aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
    1925             : 
    1926          62 :         aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
    1927             : 
    1928         184 :         for (int setno = 0; setno < aggstate->num_hashes; setno++)
    1929             :         {
    1930         122 :             AggStatePerHash perhash = &aggstate->perhash[setno];
    1931         122 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    1932             : 
    1933         122 :             hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
    1934         122 :                                perhash->aggnode->numGroups,
    1935             :                                aggstate->hashentrysize);
    1936             :         }
    1937             :     }
    1938       26458 : }
    1939             : 
    1940             : /*
    1941             :  * Update metrics after filling the hash table.
    1942             :  *
    1943             :  * If reading from the outer plan, from_tape should be false; if reading from
    1944             :  * another tape, from_tape should be true.
    1945             :  */
    1946             : static void
    1947       42084 : hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
    1948             : {
    1949             :     Size        meta_mem;
    1950             :     Size        entry_mem;
    1951             :     Size        hashkey_mem;
    1952             :     Size        buffer_mem;
    1953             :     Size        total_mem;
    1954             : 
    1955       42084 :     if (aggstate->aggstrategy != AGG_MIXED &&
    1956       15678 :         aggstate->aggstrategy != AGG_HASHED)
    1957           0 :         return;
    1958             : 
    1959             :     /* memory for the hash table itself */
    1960       42084 :     meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
    1961             : 
    1962             :     /* memory for hash entries */
    1963       42084 :     entry_mem = MemoryContextMemAllocated(aggstate->hash_tablecxt, true);
    1964             : 
    1965             :     /* memory for byref transition states */
    1966       42084 :     hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
    1967             : 
    1968             :     /* memory for read/write tape buffers, if spilled */
    1969       42084 :     buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
    1970       42084 :     if (from_tape)
    1971       26944 :         buffer_mem += HASHAGG_READ_BUFFER_SIZE;
    1972             : 
    1973             :     /* update peak mem */
    1974       42084 :     total_mem = meta_mem + entry_mem + hashkey_mem + buffer_mem;
    1975       42084 :     if (total_mem > aggstate->hash_mem_peak)
    1976        4156 :         aggstate->hash_mem_peak = total_mem;
    1977             : 
    1978             :     /* update disk usage */
    1979       42084 :     if (aggstate->hash_tapeset != NULL)
    1980             :     {
    1981       27006 :         uint64      disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
    1982             : 
    1983       27006 :         if (aggstate->hash_disk_used < disk_used)
    1984          52 :             aggstate->hash_disk_used = disk_used;
    1985             :     }
    1986             : 
    1987             :     /* update hashentrysize estimate based on contents */
    1988       42084 :     if (aggstate->hash_ngroups_current > 0)
    1989             :     {
    1990       41730 :         aggstate->hashentrysize =
    1991       41730 :             TupleHashEntrySize() +
    1992       41730 :             (hashkey_mem / (double) aggstate->hash_ngroups_current);
    1993             :     }
    1994             : }
    1995             : 
    1996             : /*
    1997             :  * Create memory contexts used for hash aggregation.
    1998             :  */
    1999             : static void
    2000        5726 : hash_create_memory(AggState *aggstate)
    2001             : {
    2002        5726 :     Size        maxBlockSize = ALLOCSET_DEFAULT_MAXSIZE;
    2003             : 
    2004             :     /*
    2005             :      * The hashcontext's per-tuple memory will be used for byref transition
    2006             :      * values and returned by AggCheckCallContext().
    2007             :      */
    2008        5726 :     aggstate->hashcontext = CreateWorkExprContext(aggstate->ss.ps.state);
    2009             : 
    2010             :     /*
    2011             :      * The meta context will be used for the bucket array of
    2012             :      * TupleHashEntryData (or arrays, in the case of grouping sets). As the
    2013             :      * hash table grows, the bucket array will double in size and the old one
    2014             :      * will be freed, so an AllocSet is appropriate. For large bucket arrays,
    2015             :      * the large allocation path will be used, so it's not worth worrying
    2016             :      * about wasting space due to power-of-two allocations.
    2017             :      */
    2018        5726 :     aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
    2019             :                                                    "HashAgg meta context",
    2020             :                                                    ALLOCSET_DEFAULT_SIZES);
    2021             : 
    2022             :     /*
    2023             :      * The hash entries themselves, which include the grouping key
    2024             :      * (firstTuple) and pergroup data, are stored in the table context. The
    2025             :      * bump allocator can be used because the entries are not freed until the
    2026             :      * entire hash table is reset. The bump allocator is faster for
    2027             :      * allocations and avoids wasting space on the chunk header or
    2028             :      * power-of-two allocations.
    2029             :      *
    2030             :      * Like CreateWorkExprContext(), use smaller sizings for smaller work_mem,
    2031             :      * to avoid large jumps in memory usage.
    2032             :      */
    2033             : 
    2034             :     /*
    2035             :      * Like CreateWorkExprContext(), use smaller sizings for smaller work_mem,
    2036             :      * to avoid large jumps in memory usage.
    2037             :      */
    2038        5726 :     maxBlockSize = pg_prevpower2_size_t(work_mem * (Size) 1024 / 16);
    2039             : 
    2040             :     /* But no bigger than ALLOCSET_DEFAULT_MAXSIZE */
    2041        5726 :     maxBlockSize = Min(maxBlockSize, ALLOCSET_DEFAULT_MAXSIZE);
    2042             : 
    2043             :     /* and no smaller than ALLOCSET_DEFAULT_INITSIZE */
    2044        5726 :     maxBlockSize = Max(maxBlockSize, ALLOCSET_DEFAULT_INITSIZE);
    2045             : 
    2046        5726 :     aggstate->hash_tablecxt = BumpContextCreate(aggstate->ss.ps.state->es_query_cxt,
    2047             :                                                 "HashAgg table context",
    2048             :                                                 ALLOCSET_DEFAULT_MINSIZE,
    2049             :                                                 ALLOCSET_DEFAULT_INITSIZE,
    2050             :                                                 maxBlockSize);
    2051             : 
    2052        5726 : }
    2053             : 
    2054             : /*
    2055             :  * Choose a reasonable number of buckets for the initial hash table size.
    2056             :  */
    2057             : static long
    2058        4668 : hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
    2059             : {
    2060             :     long        max_nbuckets;
    2061        4668 :     long        nbuckets = ngroups;
    2062             : 
    2063        4668 :     max_nbuckets = memory / hashentrysize;
    2064             : 
    2065             :     /*
    2066             :      * Underestimating is better than overestimating. Too many buckets crowd
    2067             :      * out space for group keys and transition state values.
    2068             :      */
    2069        4668 :     max_nbuckets >>= 1;
    2070             : 
    2071        4668 :     if (nbuckets > max_nbuckets)
    2072          72 :         nbuckets = max_nbuckets;
    2073             : 
    2074        4668 :     return Max(nbuckets, 1);
    2075             : }
    2076             : 
    2077             : /*
    2078             :  * Determine the number of partitions to create when spilling, which will
    2079             :  * always be a power of two. If log2_npartitions is non-NULL, set
    2080             :  * *log2_npartitions to the log2() of the number of partitions.
    2081             :  */
    2082             : static int
    2083       15052 : hash_choose_num_partitions(double input_groups, double hashentrysize,
    2084             :                            int used_bits, int *log2_npartitions)
    2085             : {
    2086       15052 :     Size        hash_mem_limit = get_hash_memory_limit();
    2087             :     double      partition_limit;
    2088             :     double      mem_wanted;
    2089             :     double      dpartitions;
    2090             :     int         npartitions;
    2091             :     int         partition_bits;
    2092             : 
    2093             :     /*
    2094             :      * Avoid creating so many partitions that the memory requirements of the
    2095             :      * open partition files are greater than 1/4 of hash_mem.
    2096             :      */
    2097       15052 :     partition_limit =
    2098       15052 :         (hash_mem_limit * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
    2099             :         HASHAGG_WRITE_BUFFER_SIZE;
    2100             : 
    2101       15052 :     mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
    2102             : 
    2103             :     /* make enough partitions so that each one is likely to fit in memory */
    2104       15052 :     dpartitions = 1 + (mem_wanted / hash_mem_limit);
    2105             : 
    2106       15052 :     if (dpartitions > partition_limit)
    2107       14988 :         dpartitions = partition_limit;
    2108             : 
    2109       15052 :     if (dpartitions < HASHAGG_MIN_PARTITIONS)
    2110       15052 :         dpartitions = HASHAGG_MIN_PARTITIONS;
    2111       15052 :     if (dpartitions > HASHAGG_MAX_PARTITIONS)
    2112           0 :         dpartitions = HASHAGG_MAX_PARTITIONS;
    2113             : 
    2114             :     /* HASHAGG_MAX_PARTITIONS limit makes this safe */
    2115       15052 :     npartitions = (int) dpartitions;
    2116             : 
    2117             :     /* ceil(log2(npartitions)) */
    2118       15052 :     partition_bits = my_log2(npartitions);
    2119             : 
    2120             :     /* make sure that we don't exhaust the hash bits */
    2121       15052 :     if (partition_bits + used_bits >= 32)
    2122           0 :         partition_bits = 32 - used_bits;
    2123             : 
    2124       15052 :     if (log2_npartitions != NULL)
    2125       12622 :         *log2_npartitions = partition_bits;
    2126             : 
    2127             :     /* number of partitions will be a power of two */
    2128       15052 :     npartitions = 1 << partition_bits;
    2129             : 
    2130       15052 :     return npartitions;
    2131             : }
    2132             : 
    2133             : /*
    2134             :  * Initialize a freshly-created TupleHashEntry.
    2135             :  */
    2136             : static void
    2137      527230 : initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable,
    2138             :                       TupleHashEntry entry)
    2139             : {
    2140             :     AggStatePerGroup pergroup;
    2141             :     int         transno;
    2142             : 
    2143      527230 :     aggstate->hash_ngroups_current++;
    2144      527230 :     hash_agg_check_limits(aggstate);
    2145             : 
    2146             :     /* no need to allocate or initialize per-group state */
    2147      527230 :     if (aggstate->numtrans == 0)
    2148      215422 :         return;
    2149             : 
    2150      311808 :     pergroup = (AggStatePerGroup) TupleHashEntryGetAdditional(hashtable, entry);
    2151             : 
    2152             :     /*
    2153             :      * Initialize aggregates for new tuple group, lookup_hash_entries()
    2154             :      * already has selected the relevant grouping set.
    2155             :      */
    2156      769106 :     for (transno = 0; transno < aggstate->numtrans; transno++)
    2157             :     {
    2158      457298 :         AggStatePerTrans pertrans = &aggstate->pertrans[transno];
    2159      457298 :         AggStatePerGroup pergroupstate = &pergroup[transno];
    2160             : 
    2161      457298 :         initialize_aggregate(aggstate, pertrans, pergroupstate);
    2162             :     }
    2163             : }
    2164             : 
    2165             : /*
    2166             :  * Look up hash entries for the current tuple in all hashed grouping sets.
    2167             :  *
    2168             :  * Some entries may be left NULL if we are in "spill mode". The same tuple
    2169             :  * will belong to different groups for each grouping set, so may match a group
    2170             :  * already in memory for one set and match a group not in memory for another
    2171             :  * set. When in "spill mode", the tuple will be spilled for each grouping set
    2172             :  * where it doesn't match a group in memory.
    2173             :  *
    2174             :  * NB: It's possible to spill the same tuple for several different grouping
    2175             :  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
    2176             :  * the tuple multiple times for multiple grouping sets, it can be partitioned
    2177             :  * for each grouping set, making the refilling of the hash table very
    2178             :  * efficient.
    2179             :  */
    2180             : static void
    2181     6145196 : lookup_hash_entries(AggState *aggstate)
    2182             : {
    2183     6145196 :     AggStatePerGroup *pergroup = aggstate->hash_pergroup;
    2184     6145196 :     TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
    2185             :     int         setno;
    2186             : 
    2187    12424832 :     for (setno = 0; setno < aggstate->num_hashes; setno++)
    2188             :     {
    2189     6279636 :         AggStatePerHash perhash = &aggstate->perhash[setno];
    2190     6279636 :         TupleHashTable hashtable = perhash->hashtable;
    2191     6279636 :         TupleTableSlot *hashslot = perhash->hashslot;
    2192             :         TupleHashEntry entry;
    2193             :         uint32      hash;
    2194     6279636 :         bool        isnew = false;
    2195             :         bool       *p_isnew;
    2196             : 
    2197             :         /* if hash table already spilled, don't create new entries */
    2198     6279636 :         p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
    2199             : 
    2200     6279636 :         select_current_set(aggstate, setno, true);
    2201     6279636 :         prepare_hash_slot(perhash,
    2202             :                           outerslot,
    2203             :                           hashslot);
    2204             : 
    2205     6279636 :         entry = LookupTupleHashEntry(hashtable, hashslot,
    2206             :                                      p_isnew, &hash);
    2207             : 
    2208     6279636 :         if (entry != NULL)
    2209             :         {
    2210     5512400 :             if (isnew)
    2211      371550 :                 initialize_hash_entry(aggstate, hashtable, entry);
    2212     5512400 :             pergroup[setno] = TupleHashEntryGetAdditional(hashtable, entry);
    2213             :         }
    2214             :         else
    2215             :         {
    2216      767236 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    2217      767236 :             TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
    2218             : 
    2219      767236 :             if (spill->partitions == NULL)
    2220           0 :                 hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
    2221           0 :                                    perhash->aggnode->numGroups,
    2222             :                                    aggstate->hashentrysize);
    2223             : 
    2224      767236 :             hashagg_spill_tuple(aggstate, spill, slot, hash);
    2225      767236 :             pergroup[setno] = NULL;
    2226             :         }
    2227             :     }
    2228     6145196 : }
    2229             : 
    2230             : /*
    2231             :  * ExecAgg -
    2232             :  *
    2233             :  *    ExecAgg receives tuples from its outer subplan and aggregates over
    2234             :  *    the appropriate attribute for each aggregate function use (Aggref
    2235             :  *    node) appearing in the targetlist or qual of the node.  The number
    2236             :  *    of tuples to aggregate over depends on whether grouped or plain
    2237             :  *    aggregation is selected.  In grouped aggregation, we produce a result
    2238             :  *    row for each group; in plain aggregation there's a single result row
    2239             :  *    for the whole query.  In either case, the value of each aggregate is
    2240             :  *    stored in the expression context to be used when ExecProject evaluates
    2241             :  *    the result tuple.
    2242             :  */
    2243             : static TupleTableSlot *
    2244      818542 : ExecAgg(PlanState *pstate)
    2245             : {
    2246      818542 :     AggState   *node = castNode(AggState, pstate);
    2247      818542 :     TupleTableSlot *result = NULL;
    2248             : 
    2249      818542 :     CHECK_FOR_INTERRUPTS();
    2250             : 
    2251      818542 :     if (!node->agg_done)
    2252             :     {
    2253             :         /* Dispatch based on strategy */
    2254      754532 :         switch (node->phase->aggstrategy)
    2255             :         {
    2256      481904 :             case AGG_HASHED:
    2257      481904 :                 if (!node->table_filled)
    2258       14996 :                     agg_fill_hash_table(node);
    2259             :                 /* FALLTHROUGH */
    2260             :             case AGG_MIXED:
    2261      509266 :                 result = agg_retrieve_hash_table(node);
    2262      509266 :                 break;
    2263      245266 :             case AGG_PLAIN:
    2264             :             case AGG_SORTED:
    2265      245266 :                 result = agg_retrieve_direct(node);
    2266      245146 :                 break;
    2267             :         }
    2268             : 
    2269      754412 :         if (!TupIsNull(result))
    2270      737684 :             return result;
    2271             :     }
    2272             : 
    2273       80738 :     return NULL;
    2274             : }
    2275             : 
    2276             : /*
    2277             :  * ExecAgg for non-hashed case
    2278             :  */
    2279             : static TupleTableSlot *
    2280      245266 : agg_retrieve_direct(AggState *aggstate)
    2281             : {
    2282      245266 :     Agg        *node = aggstate->phase->aggnode;
    2283             :     ExprContext *econtext;
    2284             :     ExprContext *tmpcontext;
    2285             :     AggStatePerAgg peragg;
    2286             :     AggStatePerGroup *pergroups;
    2287             :     TupleTableSlot *outerslot;
    2288             :     TupleTableSlot *firstSlot;
    2289             :     TupleTableSlot *result;
    2290      245266 :     bool        hasGroupingSets = aggstate->phase->numsets > 0;
    2291      245266 :     int         numGroupingSets = Max(aggstate->phase->numsets, 1);
    2292             :     int         currentSet;
    2293             :     int         nextSetSize;
    2294             :     int         numReset;
    2295             :     int         i;
    2296             : 
    2297             :     /*
    2298             :      * get state info from node
    2299             :      *
    2300             :      * econtext is the per-output-tuple expression context
    2301             :      *
    2302             :      * tmpcontext is the per-input-tuple expression context
    2303             :      */
    2304      245266 :     econtext = aggstate->ss.ps.ps_ExprContext;
    2305      245266 :     tmpcontext = aggstate->tmpcontext;
    2306             : 
    2307      245266 :     peragg = aggstate->peragg;
    2308      245266 :     pergroups = aggstate->pergroups;
    2309      245266 :     firstSlot = aggstate->ss.ss_ScanTupleSlot;
    2310             : 
    2311             :     /*
    2312             :      * We loop retrieving groups until we find one matching
    2313             :      * aggstate->ss.ps.qual
    2314             :      *
    2315             :      * For grouping sets, we have the invariant that aggstate->projected_set
    2316             :      * is either -1 (initial call) or the index (starting from 0) in
    2317             :      * gset_lengths for the group we just completed (either by projecting a
    2318             :      * row or by discarding it in the qual).
    2319             :      */
    2320      316070 :     while (!aggstate->agg_done)
    2321             :     {
    2322             :         /*
    2323             :          * Clear the per-output-tuple context for each group, as well as
    2324             :          * aggcontext (which contains any pass-by-ref transvalues of the old
    2325             :          * group).  Some aggregate functions store working state in child
    2326             :          * contexts; those now get reset automatically without us needing to
    2327             :          * do anything special.
    2328             :          *
    2329             :          * We use ReScanExprContext not just ResetExprContext because we want
    2330             :          * any registered shutdown callbacks to be called.  That allows
    2331             :          * aggregate functions to ensure they've cleaned up any non-memory
    2332             :          * resources.
    2333             :          */
    2334      315864 :         ReScanExprContext(econtext);
    2335             : 
    2336             :         /*
    2337             :          * Determine how many grouping sets need to be reset at this boundary.
    2338             :          */
    2339      315864 :         if (aggstate->projected_set >= 0 &&
    2340      246218 :             aggstate->projected_set < numGroupingSets)
    2341      246212 :             numReset = aggstate->projected_set + 1;
    2342             :         else
    2343       69652 :             numReset = numGroupingSets;
    2344             : 
    2345             :         /*
    2346             :          * numReset can change on a phase boundary, but that's OK; we want to
    2347             :          * reset the contexts used in _this_ phase, and later, after possibly
    2348             :          * changing phase, initialize the right number of aggregates for the
    2349             :          * _new_ phase.
    2350             :          */
    2351             : 
    2352      653982 :         for (i = 0; i < numReset; i++)
    2353             :         {
    2354      338118 :             ReScanExprContext(aggstate->aggcontexts[i]);
    2355             :         }
    2356             : 
    2357             :         /*
    2358             :          * Check if input is complete and there are no more groups to project
    2359             :          * in this phase; move to next phase or mark as done.
    2360             :          */
    2361      315864 :         if (aggstate->input_done == true &&
    2362        1578 :             aggstate->projected_set >= (numGroupingSets - 1))
    2363             :         {
    2364         768 :             if (aggstate->current_phase < aggstate->numphases - 1)
    2365             :             {
    2366         198 :                 initialize_phase(aggstate, aggstate->current_phase + 1);
    2367         198 :                 aggstate->input_done = false;
    2368         198 :                 aggstate->projected_set = -1;
    2369         198 :                 numGroupingSets = Max(aggstate->phase->numsets, 1);
    2370         198 :                 node = aggstate->phase->aggnode;
    2371         198 :                 numReset = numGroupingSets;
    2372             :             }
    2373         570 :             else if (aggstate->aggstrategy == AGG_MIXED)
    2374             :             {
    2375             :                 /*
    2376             :                  * Mixed mode; we've output all the grouped stuff and have
    2377             :                  * full hashtables, so switch to outputting those.
    2378             :                  */
    2379         156 :                 initialize_phase(aggstate, 0);
    2380         156 :                 aggstate->table_filled = true;
    2381         156 :                 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
    2382             :                                        &aggstate->perhash[0].hashiter);
    2383         156 :                 select_current_set(aggstate, 0, true);
    2384         156 :                 return agg_retrieve_hash_table(aggstate);
    2385             :             }
    2386             :             else
    2387             :             {
    2388         414 :                 aggstate->agg_done = true;
    2389         414 :                 break;
    2390             :             }
    2391             :         }
    2392             : 
    2393             :         /*
    2394             :          * Get the number of columns in the next grouping set after the last
    2395             :          * projected one (if any). This is the number of columns to compare to
    2396             :          * see if we reached the boundary of that set too.
    2397             :          */
    2398      315294 :         if (aggstate->projected_set >= 0 &&
    2399      245450 :             aggstate->projected_set < (numGroupingSets - 1))
    2400       27282 :             nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
    2401             :         else
    2402      288012 :             nextSetSize = 0;
    2403             : 
    2404             :         /*----------
    2405             :          * If a subgroup for the current grouping set is present, project it.
    2406             :          *
    2407             :          * We have a new group if:
    2408             :          *  - we're out of input but haven't projected all grouping sets
    2409             :          *    (checked above)
    2410             :          * OR
    2411             :          *    - we already projected a row that wasn't from the last grouping
    2412             :          *      set
    2413             :          *    AND
    2414             :          *    - the next grouping set has at least one grouping column (since
    2415             :          *      empty grouping sets project only once input is exhausted)
    2416             :          *    AND
    2417             :          *    - the previous and pending rows differ on the grouping columns
    2418             :          *      of the next grouping set
    2419             :          *----------
    2420             :          */
    2421      315294 :         tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
    2422      315294 :         if (aggstate->input_done ||
    2423      314484 :             (node->aggstrategy != AGG_PLAIN &&
    2424      246370 :              aggstate->projected_set != -1 &&
    2425      244640 :              aggstate->projected_set < (numGroupingSets - 1) &&
    2426       19940 :              nextSetSize > 0 &&
    2427       19940 :              !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
    2428             :                                tmpcontext)))
    2429             :         {
    2430       14144 :             aggstate->projected_set += 1;
    2431             : 
    2432             :             Assert(aggstate->projected_set < numGroupingSets);
    2433       14144 :             Assert(nextSetSize > 0 || aggstate->input_done);
    2434             :         }
    2435             :         else
    2436             :         {
    2437             :             /*
    2438             :              * We no longer care what group we just projected, the next
    2439             :              * projection will always be the first (or only) grouping set
    2440             :              * (unless the input proves to be empty).
    2441             :              */
    2442      301150 :             aggstate->projected_set = 0;
    2443             : 
    2444             :             /*
    2445             :              * If we don't already have the first tuple of the new group,
    2446             :              * fetch it from the outer plan.
    2447             :              */
    2448      301150 :             if (aggstate->grp_firstTuple == NULL)
    2449             :             {
    2450       69844 :                 outerslot = fetch_input_tuple(aggstate);
    2451       69826 :                 if (!TupIsNull(outerslot))
    2452             :                 {
    2453             :                     /*
    2454             :                      * Make a copy of the first input tuple; we will use this
    2455             :                      * for comparisons (in group mode) and for projection.
    2456             :                      */
    2457       56484 :                     aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
    2458             :                 }
    2459             :                 else
    2460             :                 {
    2461             :                     /* outer plan produced no tuples at all */
    2462       13342 :                     if (hasGroupingSets)
    2463             :                     {
    2464             :                         /*
    2465             :                          * If there was no input at all, we need to project
    2466             :                          * rows only if there are grouping sets of size 0.
    2467             :                          * Note that this implies that there can't be any
    2468             :                          * references to ungrouped Vars, which would otherwise
    2469             :                          * cause issues with the empty output slot.
    2470             :                          *
    2471             :                          * XXX: This is no longer true, we currently deal with
    2472             :                          * this in finalize_aggregates().
    2473             :                          */
    2474          54 :                         aggstate->input_done = true;
    2475             : 
    2476          78 :                         while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
    2477             :                         {
    2478          30 :                             aggstate->projected_set += 1;
    2479          30 :                             if (aggstate->projected_set >= numGroupingSets)
    2480             :                             {
    2481             :                                 /*
    2482             :                                  * We can't set agg_done here because we might
    2483             :                                  * have more phases to do, even though the
    2484             :                                  * input is empty. So we need to restart the
    2485             :                                  * whole outer loop.
    2486             :                                  */
    2487           6 :                                 break;
    2488             :                             }
    2489             :                         }
    2490             : 
    2491          54 :                         if (aggstate->projected_set >= numGroupingSets)
    2492           6 :                             continue;
    2493             :                     }
    2494             :                     else
    2495             :                     {
    2496       13288 :                         aggstate->agg_done = true;
    2497             :                         /* If we are grouping, we should produce no tuples too */
    2498       13288 :                         if (node->aggstrategy != AGG_PLAIN)
    2499         162 :                             return NULL;
    2500             :                     }
    2501             :                 }
    2502             :             }
    2503             : 
    2504             :             /*
    2505             :              * Initialize working state for a new input tuple group.
    2506             :              */
    2507      300964 :             initialize_aggregates(aggstate, pergroups, numReset);
    2508             : 
    2509      300964 :             if (aggstate->grp_firstTuple != NULL)
    2510             :             {
    2511             :                 /*
    2512             :                  * Store the copied first input tuple in the tuple table slot
    2513             :                  * reserved for it.  The tuple will be deleted when it is
    2514             :                  * cleared from the slot.
    2515             :                  */
    2516      287790 :                 ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
    2517             :                                         firstSlot, true);
    2518      287790 :                 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
    2519             : 
    2520             :                 /* set up for first advance_aggregates call */
    2521      287790 :                 tmpcontext->ecxt_outertuple = firstSlot;
    2522             : 
    2523             :                 /*
    2524             :                  * Process each outer-plan tuple, and then fetch the next one,
    2525             :                  * until we exhaust the outer plan or cross a group boundary.
    2526             :                  */
    2527             :                 for (;;)
    2528             :                 {
    2529             :                     /*
    2530             :                      * During phase 1 only of a mixed agg, we need to update
    2531             :                      * hashtables as well in advance_aggregates.
    2532             :                      */
    2533    20802776 :                     if (aggstate->aggstrategy == AGG_MIXED &&
    2534       38062 :                         aggstate->current_phase == 1)
    2535             :                     {
    2536       38062 :                         lookup_hash_entries(aggstate);
    2537             :                     }
    2538             : 
    2539             :                     /* Advance the aggregates (or combine functions) */
    2540    20802776 :                     advance_aggregates(aggstate);
    2541             : 
    2542             :                     /* Reset per-input-tuple context after each tuple */
    2543    20802698 :                     ResetExprContext(tmpcontext);
    2544             : 
    2545    20802698 :                     outerslot = fetch_input_tuple(aggstate);
    2546    20802698 :                     if (TupIsNull(outerslot))
    2547             :                     {
    2548             :                         /* no more outer-plan tuples available */
    2549             : 
    2550             :                         /* if we built hash tables, finalize any spills */
    2551       56400 :                         if (aggstate->aggstrategy == AGG_MIXED &&
    2552         144 :                             aggstate->current_phase == 1)
    2553         144 :                             hashagg_finish_initial_spills(aggstate);
    2554             : 
    2555       56400 :                         if (hasGroupingSets)
    2556             :                         {
    2557         714 :                             aggstate->input_done = true;
    2558         714 :                             break;
    2559             :                         }
    2560             :                         else
    2561             :                         {
    2562       55686 :                             aggstate->agg_done = true;
    2563       55686 :                             break;
    2564             :                         }
    2565             :                     }
    2566             :                     /* set up for next advance_aggregates call */
    2567    20746298 :                     tmpcontext->ecxt_outertuple = outerslot;
    2568             : 
    2569             :                     /*
    2570             :                      * If we are grouping, check whether we've crossed a group
    2571             :                      * boundary.
    2572             :                      */
    2573    20746298 :                     if (node->aggstrategy != AGG_PLAIN && node->numCols > 0)
    2574             :                     {
    2575     2041948 :                         tmpcontext->ecxt_innertuple = firstSlot;
    2576     2041948 :                         if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
    2577             :                                       tmpcontext))
    2578             :                         {
    2579      231312 :                             aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
    2580      231312 :                             break;
    2581             :                         }
    2582             :                     }
    2583             :                 }
    2584             :             }
    2585             : 
    2586             :             /*
    2587             :              * Use the representative input tuple for any references to
    2588             :              * non-aggregated input columns in aggregate direct args, the node
    2589             :              * qual, and the tlist.  (If we are not grouping, and there are no
    2590             :              * input rows at all, we will come here with an empty firstSlot
    2591             :              * ... but if not grouping, there can't be any references to
    2592             :              * non-aggregated input columns, so no problem.)
    2593             :              */
    2594      300886 :             econtext->ecxt_outertuple = firstSlot;
    2595             :         }
    2596             : 
    2597             :         Assert(aggstate->projected_set >= 0);
    2598             : 
    2599      315030 :         currentSet = aggstate->projected_set;
    2600             : 
    2601      315030 :         prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
    2602             : 
    2603      315030 :         select_current_set(aggstate, currentSet, false);
    2604             : 
    2605      315030 :         finalize_aggregates(aggstate,
    2606             :                             peragg,
    2607      315030 :                             pergroups[currentSet]);
    2608             : 
    2609             :         /*
    2610             :          * If there's no row to project right now, we must continue rather
    2611             :          * than returning a null since there might be more groups.
    2612             :          */
    2613      315018 :         result = project_aggregates(aggstate);
    2614      315006 :         if (result)
    2615      244208 :             return result;
    2616             :     }
    2617             : 
    2618             :     /* No more groups */
    2619         620 :     return NULL;
    2620             : }
    2621             : 
    2622             : /*
    2623             :  * ExecAgg for hashed case: read input and build hash table
    2624             :  */
    2625             : static void
    2626       14996 : agg_fill_hash_table(AggState *aggstate)
    2627             : {
    2628             :     TupleTableSlot *outerslot;
    2629       14996 :     ExprContext *tmpcontext = aggstate->tmpcontext;
    2630             : 
    2631             :     /*
    2632             :      * Process each outer-plan tuple, and then fetch the next one, until we
    2633             :      * exhaust the outer plan.
    2634             :      */
    2635             :     for (;;)
    2636             :     {
    2637     6122130 :         outerslot = fetch_input_tuple(aggstate);
    2638     6122130 :         if (TupIsNull(outerslot))
    2639             :             break;
    2640             : 
    2641             :         /* set up for lookup_hash_entries and advance_aggregates */
    2642     6107134 :         tmpcontext->ecxt_outertuple = outerslot;
    2643             : 
    2644             :         /* Find or build hashtable entries */
    2645     6107134 :         lookup_hash_entries(aggstate);
    2646             : 
    2647             :         /* Advance the aggregates (or combine functions) */
    2648     6107134 :         advance_aggregates(aggstate);
    2649             : 
    2650             :         /*
    2651             :          * Reset per-input-tuple context after each tuple, but note that the
    2652             :          * hash lookups do this too
    2653             :          */
    2654     6107134 :         ResetExprContext(aggstate->tmpcontext);
    2655             :     }
    2656             : 
    2657             :     /* finalize spills, if any */
    2658       14996 :     hashagg_finish_initial_spills(aggstate);
    2659             : 
    2660       14996 :     aggstate->table_filled = true;
    2661             :     /* Initialize to walk the first hash table */
    2662       14996 :     select_current_set(aggstate, 0, true);
    2663       14996 :     ResetTupleHashIterator(aggstate->perhash[0].hashtable,
    2664             :                            &aggstate->perhash[0].hashiter);
    2665       14996 : }
    2666             : 
    2667             : /*
    2668             :  * If any data was spilled during hash aggregation, reset the hash table and
    2669             :  * reprocess one batch of spilled data. After reprocessing a batch, the hash
    2670             :  * table will again contain data, ready to be consumed by
    2671             :  * agg_retrieve_hash_table_in_memory().
    2672             :  *
    2673             :  * Should only be called after all in memory hash table entries have been
    2674             :  * finalized and emitted.
    2675             :  *
    2676             :  * Return false when input is exhausted and there's no more work to be done;
    2677             :  * otherwise return true.
    2678             :  */
    2679             : static bool
    2680       42890 : agg_refill_hash_table(AggState *aggstate)
    2681             : {
    2682             :     HashAggBatch *batch;
    2683             :     AggStatePerHash perhash;
    2684             :     HashAggSpill spill;
    2685       42890 :     LogicalTapeSet *tapeset = aggstate->hash_tapeset;
    2686       42890 :     bool        spill_initialized = false;
    2687             : 
    2688       42890 :     if (aggstate->hash_batches == NIL)
    2689       15946 :         return false;
    2690             : 
    2691             :     /* hash_batches is a stack, with the top item at the end of the list */
    2692       26944 :     batch = llast(aggstate->hash_batches);
    2693       26944 :     aggstate->hash_batches = list_delete_last(aggstate->hash_batches);
    2694             : 
    2695       26944 :     hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
    2696             :                         batch->used_bits, &aggstate->hash_mem_limit,
    2697             :                         &aggstate->hash_ngroups_limit, NULL);
    2698             : 
    2699             :     /*
    2700             :      * Each batch only processes one grouping set; set the rest to NULL so
    2701             :      * that advance_aggregates() knows to ignore them. We don't touch
    2702             :      * pergroups for sorted grouping sets here, because they will be needed if
    2703             :      * we rescan later. The expressions for sorted grouping sets will not be
    2704             :      * evaluated after we recompile anyway.
    2705             :      */
    2706      207428 :     MemSet(aggstate->hash_pergroup, 0,
    2707             :            sizeof(AggStatePerGroup) * aggstate->num_hashes);
    2708             : 
    2709             :     /* free memory and reset hash tables */
    2710       26944 :     ReScanExprContext(aggstate->hashcontext);
    2711       26944 :     MemoryContextReset(aggstate->hash_tablecxt);
    2712      207428 :     for (int setno = 0; setno < aggstate->num_hashes; setno++)
    2713      180484 :         ResetTupleHashTable(aggstate->perhash[setno].hashtable);
    2714             : 
    2715       26944 :     aggstate->hash_ngroups_current = 0;
    2716             : 
    2717             :     /*
    2718             :      * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
    2719             :      * happens in phase 0. So, we switch to phase 1 when processing a batch,
    2720             :      * and back to phase 0 after the batch is done.
    2721             :      */
    2722             :     Assert(aggstate->current_phase == 0);
    2723       26944 :     if (aggstate->phase->aggstrategy == AGG_MIXED)
    2724             :     {
    2725       26262 :         aggstate->current_phase = 1;
    2726       26262 :         aggstate->phase = &aggstate->phases[aggstate->current_phase];
    2727             :     }
    2728             : 
    2729       26944 :     select_current_set(aggstate, batch->setno, true);
    2730             : 
    2731       26944 :     perhash = &aggstate->perhash[aggstate->current_set];
    2732             : 
    2733             :     /*
    2734             :      * Spilled tuples are always read back as MinimalTuples, which may be
    2735             :      * different from the outer plan, so recompile the aggregate expressions.
    2736             :      *
    2737             :      * We still need the NULL check, because we are only processing one
    2738             :      * grouping set at a time and the rest will be NULL.
    2739             :      */
    2740       26944 :     hashagg_recompile_expressions(aggstate, true, true);
    2741             : 
    2742       26944 :     INJECTION_POINT("hash-aggregate-process-batch");
    2743             :     for (;;)
    2744     1216776 :     {
    2745     1243720 :         TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
    2746     1243720 :         TupleTableSlot *hashslot = perhash->hashslot;
    2747     1243720 :         TupleHashTable hashtable = perhash->hashtable;
    2748             :         TupleHashEntry entry;
    2749             :         MinimalTuple tuple;
    2750             :         uint32      hash;
    2751     1243720 :         bool        isnew = false;
    2752     1243720 :         bool       *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
    2753             : 
    2754     1243720 :         CHECK_FOR_INTERRUPTS();
    2755             : 
    2756     1243720 :         tuple = hashagg_batch_read(batch, &hash);
    2757     1243720 :         if (tuple == NULL)
    2758       26944 :             break;
    2759             : 
    2760     1216776 :         ExecStoreMinimalTuple(tuple, spillslot, true);
    2761     1216776 :         aggstate->tmpcontext->ecxt_outertuple = spillslot;
    2762             : 
    2763     1216776 :         prepare_hash_slot(perhash,
    2764     1216776 :                           aggstate->tmpcontext->ecxt_outertuple,
    2765             :                           hashslot);
    2766     1216776 :         entry = LookupTupleHashEntryHash(hashtable, hashslot,
    2767             :                                          p_isnew, hash);
    2768             : 
    2769     1216776 :         if (entry != NULL)
    2770             :         {
    2771      767236 :             if (isnew)
    2772      155680 :                 initialize_hash_entry(aggstate, hashtable, entry);
    2773      767236 :             aggstate->hash_pergroup[batch->setno] = TupleHashEntryGetAdditional(hashtable, entry);
    2774      767236 :             advance_aggregates(aggstate);
    2775             :         }
    2776             :         else
    2777             :         {
    2778      449540 :             if (!spill_initialized)
    2779             :             {
    2780             :                 /*
    2781             :                  * Avoid initializing the spill until we actually need it so
    2782             :                  * that we don't assign tapes that will never be used.
    2783             :                  */
    2784       12500 :                 spill_initialized = true;
    2785       12500 :                 hashagg_spill_init(&spill, tapeset, batch->used_bits,
    2786             :                                    batch->input_card, aggstate->hashentrysize);
    2787             :             }
    2788             :             /* no memory for a new group, spill */
    2789      449540 :             hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
    2790             : 
    2791      449540 :             aggstate->hash_pergroup[batch->setno] = NULL;
    2792             :         }
    2793             : 
    2794             :         /*
    2795             :          * Reset per-input-tuple context after each tuple, but note that the
    2796             :          * hash lookups do this too
    2797             :          */
    2798     1216776 :         ResetExprContext(aggstate->tmpcontext);
    2799             :     }
    2800             : 
    2801       26944 :     LogicalTapeClose(batch->input_tape);
    2802             : 
    2803             :     /* change back to phase 0 */
    2804       26944 :     aggstate->current_phase = 0;
    2805       26944 :     aggstate->phase = &aggstate->phases[aggstate->current_phase];
    2806             : 
    2807       26944 :     if (spill_initialized)
    2808             :     {
    2809       12500 :         hashagg_spill_finish(aggstate, &spill, batch->setno);
    2810       12500 :         hash_agg_update_metrics(aggstate, true, spill.npartitions);
    2811             :     }
    2812             :     else
    2813       14444 :         hash_agg_update_metrics(aggstate, true, 0);
    2814             : 
    2815       26944 :     aggstate->hash_spill_mode = false;
    2816             : 
    2817             :     /* prepare to walk the first hash table */
    2818       26944 :     select_current_set(aggstate, batch->setno, true);
    2819       26944 :     ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
    2820             :                            &aggstate->perhash[batch->setno].hashiter);
    2821             : 
    2822       26944 :     pfree(batch);
    2823             : 
    2824       26944 :     return true;
    2825             : }
    2826             : 
    2827             : /*
    2828             :  * ExecAgg for hashed case: retrieving groups from hash table
    2829             :  *
    2830             :  * After exhausting in-memory tuples, also try refilling the hash table using
    2831             :  * previously-spilled tuples. Only returns NULL after all in-memory and
    2832             :  * spilled tuples are exhausted.
    2833             :  */
    2834             : static TupleTableSlot *
    2835      509422 : agg_retrieve_hash_table(AggState *aggstate)
    2836             : {
    2837      509422 :     TupleTableSlot *result = NULL;
    2838             : 
    2839     1029842 :     while (result == NULL)
    2840             :     {
    2841      536366 :         result = agg_retrieve_hash_table_in_memory(aggstate);
    2842      536366 :         if (result == NULL)
    2843             :         {
    2844       42890 :             if (!agg_refill_hash_table(aggstate))
    2845             :             {
    2846       15946 :                 aggstate->agg_done = true;
    2847       15946 :                 break;
    2848             :             }
    2849             :         }
    2850             :     }
    2851             : 
    2852      509422 :     return result;
    2853             : }
    2854             : 
    2855             : /*
    2856             :  * Retrieve the groups from the in-memory hash tables without considering any
    2857             :  * spilled tuples.
    2858             :  */
    2859             : static TupleTableSlot *
    2860      536366 : agg_retrieve_hash_table_in_memory(AggState *aggstate)
    2861             : {
    2862             :     ExprContext *econtext;
    2863             :     AggStatePerAgg peragg;
    2864             :     AggStatePerGroup pergroup;
    2865             :     TupleHashEntry entry;
    2866             :     TupleTableSlot *firstSlot;
    2867             :     TupleTableSlot *result;
    2868             :     AggStatePerHash perhash;
    2869             : 
    2870             :     /*
    2871             :      * get state info from node.
    2872             :      *
    2873             :      * econtext is the per-output-tuple expression context.
    2874             :      */
    2875      536366 :     econtext = aggstate->ss.ps.ps_ExprContext;
    2876      536366 :     peragg = aggstate->peragg;
    2877      536366 :     firstSlot = aggstate->ss.ss_ScanTupleSlot;
    2878             : 
    2879             :     /*
    2880             :      * Note that perhash (and therefore anything accessed through it) can
    2881             :      * change inside the loop, as we change between grouping sets.
    2882             :      */
    2883      536366 :     perhash = &aggstate->perhash[aggstate->current_set];
    2884             : 
    2885             :     /*
    2886             :      * We loop retrieving groups until we find one satisfying
    2887             :      * aggstate->ss.ps.qual
    2888             :      */
    2889             :     for (;;)
    2890      135942 :     {
    2891      672308 :         TupleTableSlot *hashslot = perhash->hashslot;
    2892      672308 :         TupleHashTable hashtable = perhash->hashtable;
    2893             :         int         i;
    2894             : 
    2895      672308 :         CHECK_FOR_INTERRUPTS();
    2896             : 
    2897             :         /*
    2898             :          * Find the next entry in the hash table
    2899             :          */
    2900      672308 :         entry = ScanTupleHashTable(hashtable, &perhash->hashiter);
    2901      672308 :         if (entry == NULL)
    2902             :         {
    2903      143176 :             int         nextset = aggstate->current_set + 1;
    2904             : 
    2905      143176 :             if (nextset < aggstate->num_hashes)
    2906             :             {
    2907             :                 /*
    2908             :                  * Switch to next grouping set, reinitialize, and restart the
    2909             :                  * loop.
    2910             :                  */
    2911      100286 :                 select_current_set(aggstate, nextset, true);
    2912             : 
    2913      100286 :                 perhash = &aggstate->perhash[aggstate->current_set];
    2914             : 
    2915      100286 :                 ResetTupleHashIterator(hashtable, &perhash->hashiter);
    2916             : 
    2917      100286 :                 continue;
    2918             :             }
    2919             :             else
    2920             :             {
    2921       42890 :                 return NULL;
    2922             :             }
    2923             :         }
    2924             : 
    2925             :         /*
    2926             :          * Clear the per-output-tuple context for each group
    2927             :          *
    2928             :          * We intentionally don't use ReScanExprContext here; if any aggs have
    2929             :          * registered shutdown callbacks, they mustn't be called yet, since we
    2930             :          * might not be done with that agg.
    2931             :          */
    2932      529132 :         ResetExprContext(econtext);
    2933             : 
    2934             :         /*
    2935             :          * Transform representative tuple back into one with the right
    2936             :          * columns.
    2937             :          */
    2938      529132 :         ExecStoreMinimalTuple(TupleHashEntryGetTuple(entry), hashslot, false);
    2939      529132 :         slot_getallattrs(hashslot);
    2940             : 
    2941      529132 :         ExecClearTuple(firstSlot);
    2942      529132 :         memset(firstSlot->tts_isnull, true,
    2943      529132 :                firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
    2944             : 
    2945     1393924 :         for (i = 0; i < perhash->numhashGrpCols; i++)
    2946             :         {
    2947      864792 :             int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    2948             : 
    2949      864792 :             firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
    2950      864792 :             firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
    2951             :         }
    2952      529132 :         ExecStoreVirtualTuple(firstSlot);
    2953             : 
    2954      529132 :         pergroup = (AggStatePerGroup) TupleHashEntryGetAdditional(hashtable, entry);
    2955             : 
    2956             :         /*
    2957             :          * Use the representative input tuple for any references to
    2958             :          * non-aggregated input columns in the qual and tlist.
    2959             :          */
    2960      529132 :         econtext->ecxt_outertuple = firstSlot;
    2961             : 
    2962      529132 :         prepare_projection_slot(aggstate,
    2963             :                                 econtext->ecxt_outertuple,
    2964             :                                 aggstate->current_set);
    2965             : 
    2966      529132 :         finalize_aggregates(aggstate, peragg, pergroup);
    2967             : 
    2968      529132 :         result = project_aggregates(aggstate);
    2969      529132 :         if (result)
    2970      493476 :             return result;
    2971             :     }
    2972             : 
    2973             :     /* No more groups */
    2974             :     return NULL;
    2975             : }
    2976             : 
    2977             : /*
    2978             :  * hashagg_spill_init
    2979             :  *
    2980             :  * Called after we determined that spilling is necessary. Chooses the number
    2981             :  * of partitions to create, and initializes them.
    2982             :  */
    2983             : static void
    2984       12622 : hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
    2985             :                    double input_groups, double hashentrysize)
    2986             : {
    2987             :     int         npartitions;
    2988             :     int         partition_bits;
    2989             : 
    2990       12622 :     npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
    2991             :                                              used_bits, &partition_bits);
    2992             : 
    2993             : #ifdef USE_INJECTION_POINTS
    2994       12622 :     if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
    2995             :     {
    2996          10 :         npartitions = 1;
    2997          10 :         partition_bits = 0;
    2998          10 :         INJECTION_POINT_CACHED("hash-aggregate-single-partition");
    2999             :     }
    3000             : #endif
    3001             : 
    3002       12622 :     spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
    3003       12622 :     spill->ntuples = palloc0(sizeof(int64) * npartitions);
    3004       12622 :     spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
    3005             : 
    3006       63080 :     for (int i = 0; i < npartitions; i++)
    3007       50458 :         spill->partitions[i] = LogicalTapeCreate(tapeset);
    3008             : 
    3009       12622 :     spill->shift = 32 - used_bits - partition_bits;
    3010       12622 :     if (spill->shift < 32)
    3011       12612 :         spill->mask = (npartitions - 1) << spill->shift;
    3012             :     else
    3013          10 :         spill->mask = 0;
    3014       12622 :     spill->npartitions = npartitions;
    3015             : 
    3016       63080 :     for (int i = 0; i < npartitions; i++)
    3017       50458 :         initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
    3018       12622 : }
    3019             : 
    3020             : /*
    3021             :  * hashagg_spill_tuple
    3022             :  *
    3023             :  * No room for new groups in the hash table. Save for later in the appropriate
    3024             :  * partition.
    3025             :  */
    3026             : static Size
    3027     1216776 : hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
    3028             :                     TupleTableSlot *inputslot, uint32 hash)
    3029             : {
    3030             :     TupleTableSlot *spillslot;
    3031             :     int         partition;
    3032             :     MinimalTuple tuple;
    3033             :     LogicalTape *tape;
    3034     1216776 :     int         total_written = 0;
    3035             :     bool        shouldFree;
    3036             : 
    3037             :     Assert(spill->partitions != NULL);
    3038             : 
    3039             :     /* spill only attributes that we actually need */
    3040     1216776 :     if (!aggstate->all_cols_needed)
    3041             :     {
    3042        1572 :         spillslot = aggstate->hash_spill_wslot;
    3043        1572 :         slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
    3044        1572 :         ExecClearTuple(spillslot);
    3045        4716 :         for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
    3046             :         {
    3047        3144 :             if (bms_is_member(i + 1, aggstate->colnos_needed))
    3048             :             {
    3049        1572 :                 spillslot->tts_values[i] = inputslot->tts_values[i];
    3050        1572 :                 spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
    3051             :             }
    3052             :             else
    3053        1572 :                 spillslot->tts_isnull[i] = true;
    3054             :         }
    3055        1572 :         ExecStoreVirtualTuple(spillslot);
    3056             :     }
    3057             :     else
    3058     1215204 :         spillslot = inputslot;
    3059             : 
    3060     1216776 :     tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
    3061             : 
    3062     1216776 :     if (spill->shift < 32)
    3063     1195776 :         partition = (hash & spill->mask) >> spill->shift;
    3064             :     else
    3065       21000 :         partition = 0;
    3066             : 
    3067     1216776 :     spill->ntuples[partition]++;
    3068             : 
    3069             :     /*
    3070             :      * All hash values destined for a given partition have some bits in
    3071             :      * common, which causes bad HLL cardinality estimates. Hash the hash to
    3072             :      * get a more uniform distribution.
    3073             :      */
    3074     1216776 :     addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
    3075             : 
    3076     1216776 :     tape = spill->partitions[partition];
    3077             : 
    3078     1216776 :     LogicalTapeWrite(tape, &hash, sizeof(uint32));
    3079     1216776 :     total_written += sizeof(uint32);
    3080             : 
    3081     1216776 :     LogicalTapeWrite(tape, tuple, tuple->t_len);
    3082     1216776 :     total_written += tuple->t_len;
    3083             : 
    3084     1216776 :     if (shouldFree)
    3085      767236 :         pfree(tuple);
    3086             : 
    3087     1216776 :     return total_written;
    3088             : }
    3089             : 
    3090             : /*
    3091             :  * hashagg_batch_new
    3092             :  *
    3093             :  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
    3094             :  * be done.
    3095             :  */
    3096             : static HashAggBatch *
    3097       26944 : hashagg_batch_new(LogicalTape *input_tape, int setno,
    3098             :                   int64 input_tuples, double input_card, int used_bits)
    3099             : {
    3100       26944 :     HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
    3101             : 
    3102       26944 :     batch->setno = setno;
    3103       26944 :     batch->used_bits = used_bits;
    3104       26944 :     batch->input_tape = input_tape;
    3105       26944 :     batch->input_tuples = input_tuples;
    3106       26944 :     batch->input_card = input_card;
    3107             : 
    3108       26944 :     return batch;
    3109             : }
    3110             : 
    3111             : /*
    3112             :  * hashagg_batch_read
    3113             :  *      read the next tuple from a batch's tape.  Return NULL if no more.
    3114             :  */
    3115             : static MinimalTuple
    3116     1243720 : hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
    3117             : {
    3118     1243720 :     LogicalTape *tape = batch->input_tape;
    3119             :     MinimalTuple tuple;
    3120             :     uint32      t_len;
    3121             :     size_t      nread;
    3122             :     uint32      hash;
    3123             : 
    3124     1243720 :     nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
    3125     1243720 :     if (nread == 0)
    3126       26944 :         return NULL;
    3127     1216776 :     if (nread != sizeof(uint32))
    3128           0 :         ereport(ERROR,
    3129             :                 (errcode_for_file_access(),
    3130             :                  errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
    3131             :                                  tape, sizeof(uint32), nread)));
    3132     1216776 :     if (hashp != NULL)
    3133     1216776 :         *hashp = hash;
    3134             : 
    3135     1216776 :     nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
    3136     1216776 :     if (nread != sizeof(uint32))
    3137           0 :         ereport(ERROR,
    3138             :                 (errcode_for_file_access(),
    3139             :                  errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
    3140             :                                  tape, sizeof(uint32), nread)));
    3141             : 
    3142     1216776 :     tuple = (MinimalTuple) palloc(t_len);
    3143     1216776 :     tuple->t_len = t_len;
    3144             : 
    3145     1216776 :     nread = LogicalTapeRead(tape,
    3146             :                             (char *) tuple + sizeof(uint32),
    3147             :                             t_len - sizeof(uint32));
    3148     1216776 :     if (nread != t_len - sizeof(uint32))
    3149           0 :         ereport(ERROR,
    3150             :                 (errcode_for_file_access(),
    3151             :                  errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
    3152             :                                  tape, t_len - sizeof(uint32), nread)));
    3153             : 
    3154     1216776 :     return tuple;
    3155             : }
    3156             : 
    3157             : /*
    3158             :  * hashagg_finish_initial_spills
    3159             :  *
    3160             :  * After a HashAggBatch has been processed, it may have spilled tuples to
    3161             :  * disk. If so, turn the spilled partitions into new batches that must later
    3162             :  * be executed.
    3163             :  */
    3164             : static void
    3165       15140 : hashagg_finish_initial_spills(AggState *aggstate)
    3166             : {
    3167             :     int         setno;
    3168       15140 :     int         total_npartitions = 0;
    3169             : 
    3170       15140 :     if (aggstate->hash_spills != NULL)
    3171             :     {
    3172         184 :         for (setno = 0; setno < aggstate->num_hashes; setno++)
    3173             :         {
    3174         122 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    3175             : 
    3176         122 :             total_npartitions += spill->npartitions;
    3177         122 :             hashagg_spill_finish(aggstate, spill, setno);
    3178             :         }
    3179             : 
    3180             :         /*
    3181             :          * We're not processing tuples from outer plan any more; only
    3182             :          * processing batches of spilled tuples. The initial spill structures
    3183             :          * are no longer needed.
    3184             :          */
    3185          62 :         pfree(aggstate->hash_spills);
    3186          62 :         aggstate->hash_spills = NULL;
    3187             :     }
    3188             : 
    3189       15140 :     hash_agg_update_metrics(aggstate, false, total_npartitions);
    3190       15140 :     aggstate->hash_spill_mode = false;
    3191       15140 : }
    3192             : 
    3193             : /*
    3194             :  * hashagg_spill_finish
    3195             :  *
    3196             :  * Transform spill partitions into new batches.
    3197             :  */
    3198             : static void
    3199       12622 : hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
    3200             : {
    3201             :     int         i;
    3202       12622 :     int         used_bits = 32 - spill->shift;
    3203             : 
    3204       12622 :     if (spill->npartitions == 0)
    3205           0 :         return;                 /* didn't spill */
    3206             : 
    3207       63080 :     for (i = 0; i < spill->npartitions; i++)
    3208             :     {
    3209       50458 :         LogicalTape *tape = spill->partitions[i];
    3210             :         HashAggBatch *new_batch;
    3211             :         double      cardinality;
    3212             : 
    3213             :         /* if the partition is empty, don't create a new batch of work */
    3214       50458 :         if (spill->ntuples[i] == 0)
    3215       23514 :             continue;
    3216             : 
    3217       26944 :         cardinality = estimateHyperLogLog(&spill->hll_card[i]);
    3218       26944 :         freeHyperLogLog(&spill->hll_card[i]);
    3219             : 
    3220             :         /* rewinding frees the buffer while not in use */
    3221       26944 :         LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
    3222             : 
    3223       26944 :         new_batch = hashagg_batch_new(tape, setno,
    3224       26944 :                                       spill->ntuples[i], cardinality,
    3225             :                                       used_bits);
    3226       26944 :         aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch);
    3227       26944 :         aggstate->hash_batches_used++;
    3228             :     }
    3229             : 
    3230       12622 :     pfree(spill->ntuples);
    3231       12622 :     pfree(spill->hll_card);
    3232       12622 :     pfree(spill->partitions);
    3233             : }
    3234             : 
    3235             : /*
    3236             :  * Free resources related to a spilled HashAgg.
    3237             :  */
    3238             : static void
    3239       59030 : hashagg_reset_spill_state(AggState *aggstate)
    3240             : {
    3241             :     /* free spills from initial pass */
    3242       59030 :     if (aggstate->hash_spills != NULL)
    3243             :     {
    3244             :         int         setno;
    3245             : 
    3246           0 :         for (setno = 0; setno < aggstate->num_hashes; setno++)
    3247             :         {
    3248           0 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    3249             : 
    3250           0 :             pfree(spill->ntuples);
    3251           0 :             pfree(spill->partitions);
    3252             :         }
    3253           0 :         pfree(aggstate->hash_spills);
    3254           0 :         aggstate->hash_spills = NULL;
    3255             :     }
    3256             : 
    3257             :     /* free batches */
    3258       59030 :     list_free_deep(aggstate->hash_batches);
    3259       59030 :     aggstate->hash_batches = NIL;
    3260             : 
    3261             :     /* close tape set */
    3262       59030 :     if (aggstate->hash_tapeset != NULL)
    3263             :     {
    3264          62 :         LogicalTapeSetClose(aggstate->hash_tapeset);
    3265          62 :         aggstate->hash_tapeset = NULL;
    3266             :     }
    3267       59030 : }
    3268             : 
    3269             : 
    3270             : /* -----------------
    3271             :  * ExecInitAgg
    3272             :  *
    3273             :  *  Creates the run-time information for the agg node produced by the
    3274             :  *  planner and initializes its outer subtree.
    3275             :  *
    3276             :  * -----------------
    3277             :  */
    3278             : AggState *
    3279       48134 : ExecInitAgg(Agg *node, EState *estate, int eflags)
    3280             : {
    3281             :     AggState   *aggstate;
    3282             :     AggStatePerAgg peraggs;
    3283             :     AggStatePerTrans pertransstates;
    3284             :     AggStatePerGroup *pergroups;
    3285             :     Plan       *outerPlan;
    3286             :     ExprContext *econtext;
    3287             :     TupleDesc   scanDesc;
    3288             :     int         max_aggno;
    3289             :     int         max_transno;
    3290             :     int         numaggrefs;
    3291             :     int         numaggs;
    3292             :     int         numtrans;
    3293             :     int         phase;
    3294             :     int         phaseidx;
    3295             :     ListCell   *l;
    3296       48134 :     Bitmapset  *all_grouped_cols = NULL;
    3297       48134 :     int         numGroupingSets = 1;
    3298             :     int         numPhases;
    3299             :     int         numHashes;
    3300       48134 :     int         i = 0;
    3301       48134 :     int         j = 0;
    3302       90774 :     bool        use_hashing = (node->aggstrategy == AGG_HASHED ||
    3303       42640 :                                node->aggstrategy == AGG_MIXED);
    3304             : 
    3305             :     /* check for unsupported flags */
    3306             :     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
    3307             : 
    3308             :     /*
    3309             :      * create state structure
    3310             :      */
    3311       48134 :     aggstate = makeNode(AggState);
    3312       48134 :     aggstate->ss.ps.plan = (Plan *) node;
    3313       48134 :     aggstate->ss.ps.state = estate;
    3314       48134 :     aggstate->ss.ps.ExecProcNode = ExecAgg;
    3315             : 
    3316       48134 :     aggstate->aggs = NIL;
    3317       48134 :     aggstate->numaggs = 0;
    3318       48134 :     aggstate->numtrans = 0;
    3319       48134 :     aggstate->aggstrategy = node->aggstrategy;
    3320       48134 :     aggstate->aggsplit = node->aggsplit;
    3321       48134 :     aggstate->maxsets = 0;
    3322       48134 :     aggstate->projected_set = -1;
    3323       48134 :     aggstate->current_set = 0;
    3324       48134 :     aggstate->peragg = NULL;
    3325       48134 :     aggstate->pertrans = NULL;
    3326       48134 :     aggstate->curperagg = NULL;
    3327       48134 :     aggstate->curpertrans = NULL;
    3328       48134 :     aggstate->input_done = false;
    3329       48134 :     aggstate->agg_done = false;
    3330       48134 :     aggstate->pergroups = NULL;
    3331       48134 :     aggstate->grp_firstTuple = NULL;
    3332       48134 :     aggstate->sort_in = NULL;
    3333       48134 :     aggstate->sort_out = NULL;
    3334             : 
    3335             :     /*
    3336             :      * phases[0] always exists, but is dummy in sorted/plain mode
    3337             :      */
    3338       48134 :     numPhases = (use_hashing ? 1 : 2);
    3339       48134 :     numHashes = (use_hashing ? 1 : 0);
    3340             : 
    3341             :     /*
    3342             :      * Calculate the maximum number of grouping sets in any phase; this
    3343             :      * determines the size of some allocations.  Also calculate the number of
    3344             :      * phases, since all hashed/mixed nodes contribute to only a single phase.
    3345             :      */
    3346       48134 :     if (node->groupingSets)
    3347             :     {
    3348         860 :         numGroupingSets = list_length(node->groupingSets);
    3349             : 
    3350        1834 :         foreach(l, node->chain)
    3351             :         {
    3352         974 :             Agg        *agg = lfirst(l);
    3353             : 
    3354         974 :             numGroupingSets = Max(numGroupingSets,
    3355             :                                   list_length(agg->groupingSets));
    3356             : 
    3357             :             /*
    3358             :              * additional AGG_HASHED aggs become part of phase 0, but all
    3359             :              * others add an extra phase.
    3360             :              */
    3361         974 :             if (agg->aggstrategy != AGG_HASHED)
    3362         472 :                 ++numPhases;
    3363             :             else
    3364         502 :                 ++numHashes;
    3365             :         }
    3366             :     }
    3367             : 
    3368       48134 :     aggstate->maxsets = numGroupingSets;
    3369       48134 :     aggstate->numphases = numPhases;
    3370             : 
    3371       48134 :     aggstate->aggcontexts = (ExprContext **)
    3372       48134 :         palloc0(sizeof(ExprContext *) * numGroupingSets);
    3373             : 
    3374             :     /*
    3375             :      * Create expression contexts.  We need three or more, one for
    3376             :      * per-input-tuple processing, one for per-output-tuple processing, one
    3377             :      * for all the hashtables, and one for each grouping set.  The per-tuple
    3378             :      * memory context of the per-grouping-set ExprContexts (aggcontexts)
    3379             :      * replaces the standalone memory context formerly used to hold transition
    3380             :      * values.  We cheat a little by using ExecAssignExprContext() to build
    3381             :      * all of them.
    3382             :      *
    3383             :      * NOTE: the details of what is stored in aggcontexts and what is stored
    3384             :      * in the regular per-query memory context are driven by a simple
    3385             :      * decision: we want to reset the aggcontext at group boundaries (if not
    3386             :      * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
    3387             :      */
    3388       48134 :     ExecAssignExprContext(estate, &aggstate->ss.ps);
    3389       48134 :     aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
    3390             : 
    3391       97108 :     for (i = 0; i < numGroupingSets; ++i)
    3392             :     {
    3393       48974 :         ExecAssignExprContext(estate, &aggstate->ss.ps);
    3394       48974 :         aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
    3395             :     }
    3396             : 
    3397       48134 :     if (use_hashing)
    3398        5726 :         hash_create_memory(aggstate);
    3399             : 
    3400       48134 :     ExecAssignExprContext(estate, &aggstate->ss.ps);
    3401             : 
    3402             :     /*
    3403             :      * Initialize child nodes.
    3404             :      *
    3405             :      * If we are doing a hashed aggregation then the child plan does not need
    3406             :      * to handle REWIND efficiently; see ExecReScanAgg.
    3407             :      */
    3408       48134 :     if (node->aggstrategy == AGG_HASHED)
    3409        5494 :         eflags &= ~EXEC_FLAG_REWIND;
    3410       48134 :     outerPlan = outerPlan(node);
    3411       48134 :     outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
    3412             : 
    3413             :     /*
    3414             :      * initialize source tuple type.
    3415             :      */
    3416       48134 :     aggstate->ss.ps.outerops =
    3417       48134 :         ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
    3418             :                              &aggstate->ss.ps.outeropsfixed);
    3419       48134 :     aggstate->ss.ps.outeropsset = true;
    3420             : 
    3421       48134 :     ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
    3422             :                                     aggstate->ss.ps.outerops);
    3423       48134 :     scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
    3424             : 
    3425             :     /*
    3426             :      * If there are more than two phases (including a potential dummy phase
    3427             :      * 0), input will be resorted using tuplesort. Need a slot for that.
    3428             :      */
    3429       48134 :     if (numPhases > 2)
    3430             :     {
    3431         198 :         aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
    3432             :                                                      &TTSOpsMinimalTuple);
    3433             : 
    3434             :         /*
    3435             :          * The output of the tuplesort, and the output from the outer child
    3436             :          * might not use the same type of slot. In most cases the child will
    3437             :          * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
    3438             :          * input can also be presorted due an index, in which case it could be
    3439             :          * a different type of slot.
    3440             :          *
    3441             :          * XXX: For efficiency it would be good to instead/additionally
    3442             :          * generate expressions with corresponding settings of outerops* for
    3443             :          * the individual phases - deforming is often a bottleneck for
    3444             :          * aggregations with lots of rows per group. If there's multiple
    3445             :          * sorts, we know that all but the first use TTSOpsMinimalTuple (via
    3446             :          * the nodeAgg.c internal tuplesort).
    3447             :          */
    3448         198 :         if (aggstate->ss.ps.outeropsfixed &&
    3449         198 :             aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
    3450          12 :             aggstate->ss.ps.outeropsfixed = false;
    3451             :     }
    3452             : 
    3453             :     /*
    3454             :      * Initialize result type, slot and projection.
    3455             :      */
    3456       48134 :     ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
    3457       48134 :     ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
    3458             : 
    3459             :     /*
    3460             :      * initialize child expressions
    3461             :      *
    3462             :      * We expect the parser to have checked that no aggs contain other agg
    3463             :      * calls in their arguments (and just to be sure, we verify it again while
    3464             :      * initializing the plan node).  This would make no sense under SQL
    3465             :      * semantics, and it's forbidden by the spec.  Because it is true, we
    3466             :      * don't need to worry about evaluating the aggs in any particular order.
    3467             :      *
    3468             :      * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
    3469             :      * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
    3470             :      * during ExecAssignProjectionInfo, above.
    3471             :      */
    3472       48134 :     aggstate->ss.ps.qual =
    3473       48134 :         ExecInitQual(node->plan.qual, (PlanState *) aggstate);
    3474             : 
    3475             :     /*
    3476             :      * We should now have found all Aggrefs in the targetlist and quals.
    3477             :      */
    3478       48134 :     numaggrefs = list_length(aggstate->aggs);
    3479       48134 :     max_aggno = -1;
    3480       48134 :     max_transno = -1;
    3481      101768 :     foreach(l, aggstate->aggs)
    3482             :     {
    3483       53634 :         Aggref     *aggref = (Aggref *) lfirst(l);
    3484             : 
    3485       53634 :         max_aggno = Max(max_aggno, aggref->aggno);
    3486       53634 :         max_transno = Max(max_transno, aggref->aggtransno);
    3487             :     }
    3488       48134 :     aggstate->numaggs = numaggs = max_aggno + 1;
    3489       48134 :     aggstate->numtrans = numtrans = max_transno + 1;
    3490             : 
    3491             :     /*
    3492             :      * For each phase, prepare grouping set data and fmgr lookup data for
    3493             :      * compare functions.  Accumulate all_grouped_cols in passing.
    3494             :      */
    3495       48134 :     aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
    3496             : 
    3497       48134 :     aggstate->num_hashes = numHashes;
    3498       48134 :     if (numHashes)
    3499             :     {
    3500        5726 :         aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
    3501        5726 :         aggstate->phases[0].numsets = 0;
    3502        5726 :         aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
    3503        5726 :         aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
    3504             :     }
    3505             : 
    3506       48134 :     phase = 0;
    3507       97242 :     for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
    3508             :     {
    3509             :         Agg        *aggnode;
    3510             :         Sort       *sortnode;
    3511             : 
    3512       49108 :         if (phaseidx > 0)
    3513             :         {
    3514         974 :             aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
    3515         974 :             sortnode = castNode(Sort, outerPlan(aggnode));
    3516             :         }
    3517             :         else
    3518             :         {
    3519       48134 :             aggnode = node;
    3520       48134 :             sortnode = NULL;
    3521             :         }
    3522             : 
    3523             :         Assert(phase <= 1 || sortnode);
    3524             : 
    3525       49108 :         if (aggnode->aggstrategy == AGG_HASHED
    3526       43112 :             || aggnode->aggstrategy == AGG_MIXED)
    3527             :         {
    3528        6228 :             AggStatePerPhase phasedata = &aggstate->phases[0];
    3529             :             AggStatePerHash perhash;
    3530        6228 :             Bitmapset  *cols = NULL;
    3531             : 
    3532             :             Assert(phase == 0);
    3533        6228 :             i = phasedata->numsets++;
    3534        6228 :             perhash = &aggstate->perhash[i];
    3535             : 
    3536             :             /* phase 0 always points to the "real" Agg in the hash case */
    3537        6228 :             phasedata->aggnode = node;
    3538        6228 :             phasedata->aggstrategy = node->aggstrategy;
    3539             : 
    3540             :             /* but the actual Agg node representing this hash is saved here */
    3541        6228 :             perhash->aggnode = aggnode;
    3542             : 
    3543        6228 :             phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
    3544             : 
    3545       15628 :             for (j = 0; j < aggnode->numCols; ++j)
    3546        9400 :                 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
    3547             : 
    3548        6228 :             phasedata->grouped_cols[i] = cols;
    3549             : 
    3550        6228 :             all_grouped_cols = bms_add_members(all_grouped_cols, cols);
    3551        6228 :             continue;
    3552             :         }
    3553             :         else
    3554             :         {
    3555       42880 :             AggStatePerPhase phasedata = &aggstate->phases[++phase];
    3556             :             int         num_sets;
    3557             : 
    3558       42880 :             phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
    3559             : 
    3560       42880 :             if (num_sets)
    3561             :             {
    3562         934 :                 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
    3563         934 :                 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
    3564             : 
    3565         934 :                 i = 0;
    3566        2780 :                 foreach(l, aggnode->groupingSets)
    3567             :                 {
    3568        1846 :                     int         current_length = list_length(lfirst(l));
    3569        1846 :                     Bitmapset  *cols = NULL;
    3570             : 
    3571             :                     /* planner forces this to be correct */
    3572        3642 :                     for (j = 0; j < current_length; ++j)
    3573        1796 :                         cols = bms_add_member(cols, aggnode->grpColIdx[j]);
    3574             : 
    3575        1846 :                     phasedata->grouped_cols[i] = cols;
    3576        1846 :                     phasedata->gset_lengths[i] = current_length;
    3577             : 
    3578        1846 :                     ++i;
    3579             :                 }
    3580             : 
    3581         934 :                 all_grouped_cols = bms_add_members(all_grouped_cols,
    3582         934 :                                                    phasedata->grouped_cols[0]);
    3583             :             }
    3584             :             else
    3585             :             {
    3586             :                 Assert(phaseidx == 0);
    3587             : 
    3588       41946 :                 phasedata->gset_lengths = NULL;
    3589       41946 :                 phasedata->grouped_cols = NULL;
    3590             :             }
    3591             : 
    3592             :             /*
    3593             :              * If we are grouping, precompute fmgr lookup data for inner loop.
    3594             :              */
    3595       42880 :             if (aggnode->aggstrategy == AGG_SORTED)
    3596             :             {
    3597             :                 /*
    3598             :                  * Build a separate function for each subset of columns that
    3599             :                  * need to be compared.
    3600             :                  */
    3601        2294 :                 phasedata->eqfunctions =
    3602        2294 :                     (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
    3603             : 
    3604             :                 /* for each grouping set */
    3605        3862 :                 for (int k = 0; k < phasedata->numsets; k++)
    3606             :                 {
    3607        1568 :                     int         length = phasedata->gset_lengths[k];
    3608             : 
    3609             :                     /* nothing to do for empty grouping set */
    3610        1568 :                     if (length == 0)
    3611         326 :                         continue;
    3612             : 
    3613             :                     /* if we already had one of this length, it'll do */
    3614        1242 :                     if (phasedata->eqfunctions[length - 1] != NULL)
    3615         138 :                         continue;
    3616             : 
    3617        1104 :                     phasedata->eqfunctions[length - 1] =
    3618        1104 :                         execTuplesMatchPrepare(scanDesc,
    3619             :                                                length,
    3620        1104 :                                                aggnode->grpColIdx,
    3621        1104 :                                                aggnode->grpOperators,
    3622        1104 :                                                aggnode->grpCollations,
    3623             :                                                (PlanState *) aggstate);
    3624             :                 }
    3625             : 
    3626             :                 /* and for all grouped columns, unless already computed */
    3627        2294 :                 if (aggnode->numCols > 0 &&
    3628        2200 :                     phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
    3629             :                 {
    3630        1472 :                     phasedata->eqfunctions[aggnode->numCols - 1] =
    3631        1472 :                         execTuplesMatchPrepare(scanDesc,
    3632             :                                                aggnode->numCols,
    3633        1472 :                                                aggnode->grpColIdx,
    3634        1472 :                                                aggnode->grpOperators,
    3635        1472 :                                                aggnode->grpCollations,
    3636             :                                                (PlanState *) aggstate);
    3637             :                 }
    3638             :             }
    3639             : 
    3640       42880 :             phasedata->aggnode = aggnode;
    3641       42880 :             phasedata->aggstrategy = aggnode->aggstrategy;
    3642       42880 :             phasedata->sortnode = sortnode;
    3643             :         }
    3644             :     }
    3645             : 
    3646             :     /*
    3647             :      * Convert all_grouped_cols to a descending-order list.
    3648             :      */
    3649       48134 :     i = -1;
    3650       58088 :     while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
    3651        9954 :         aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
    3652             : 
    3653             :     /*
    3654             :      * Set up aggregate-result storage in the output expr context, and also
    3655             :      * allocate my private per-agg working storage
    3656             :      */
    3657       48134 :     econtext = aggstate->ss.ps.ps_ExprContext;
    3658       48134 :     econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
    3659       48134 :     econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
    3660             : 
    3661       48134 :     peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
    3662       48134 :     pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans);
    3663             : 
    3664       48134 :     aggstate->peragg = peraggs;
    3665       48134 :     aggstate->pertrans = pertransstates;
    3666             : 
    3667             : 
    3668       48134 :     aggstate->all_pergroups =
    3669       48134 :         (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
    3670       48134 :                                      * (numGroupingSets + numHashes));
    3671       48134 :     pergroups = aggstate->all_pergroups;
    3672             : 
    3673       48134 :     if (node->aggstrategy != AGG_HASHED)
    3674             :     {
    3675       86120 :         for (i = 0; i < numGroupingSets; i++)
    3676             :         {
    3677       43480 :             pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
    3678             :                                                       * numaggs);
    3679             :         }
    3680             : 
    3681       42640 :         aggstate->pergroups = pergroups;
    3682       42640 :         pergroups += numGroupingSets;
    3683             :     }
    3684             : 
    3685             :     /*
    3686             :      * Hashing can only appear in the initial phase.
    3687             :      */
    3688       48134 :     if (use_hashing)
    3689             :     {
    3690        5726 :         Plan       *outerplan = outerPlan(node);
    3691        5726 :         uint64      totalGroups = 0;
    3692             : 
    3693        5726 :         aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
    3694             :                                                             &TTSOpsMinimalTuple);
    3695        5726 :         aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
    3696             :                                                             &TTSOpsVirtual);
    3697             : 
    3698             :         /* this is an array of pointers, not structures */
    3699        5726 :         aggstate->hash_pergroup = pergroups;
    3700             : 
    3701       11452 :         aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
    3702        5726 :                                                       outerplan->plan_width,
    3703             :                                                       node->transitionSpace);
    3704             : 
    3705             :         /*
    3706             :          * Consider all of the grouping sets together when setting the limits
    3707             :          * and estimating the number of partitions. This can be inaccurate
    3708             :          * when there is more than one grouping set, but should still be
    3709             :          * reasonable.
    3710             :          */
    3711       11954 :         for (int k = 0; k < aggstate->num_hashes; k++)
    3712        6228 :             totalGroups += aggstate->perhash[k].aggnode->numGroups;
    3713             : 
    3714        5726 :         hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
    3715             :                             &aggstate->hash_mem_limit,
    3716             :                             &aggstate->hash_ngroups_limit,
    3717             :                             &aggstate->hash_planned_partitions);
    3718        5726 :         find_hash_columns(aggstate);
    3719             : 
    3720             :         /* Skip massive memory allocation if we are just doing EXPLAIN */
    3721        5726 :         if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
    3722        4390 :             build_hash_tables(aggstate);
    3723             : 
    3724        5726 :         aggstate->table_filled = false;
    3725             : 
    3726             :         /* Initialize this to 1, meaning nothing spilled, yet */
    3727        5726 :         aggstate->hash_batches_used = 1;
    3728             :     }
    3729             : 
    3730             :     /*
    3731             :      * Initialize current phase-dependent values to initial phase. The initial
    3732             :      * phase is 1 (first sort pass) for all strategies that use sorting (if
    3733             :      * hashing is being done too, then phase 0 is processed last); but if only
    3734             :      * hashing is being done, then phase 0 is all there is.
    3735             :      */
    3736       48134 :     if (node->aggstrategy == AGG_HASHED)
    3737             :     {
    3738        5494 :         aggstate->current_phase = 0;
    3739        5494 :         initialize_phase(aggstate, 0);
    3740        5494 :         select_current_set(aggstate, 0, true);
    3741             :     }
    3742             :     else
    3743             :     {
    3744       42640 :         aggstate->current_phase = 1;
    3745       42640 :         initialize_phase(aggstate, 1);
    3746       42640 :         select_current_set(aggstate, 0, false);
    3747             :     }
    3748             : 
    3749             :     /*
    3750             :      * Perform lookups of aggregate function info, and initialize the
    3751             :      * unchanging fields of the per-agg and per-trans data.
    3752             :      */
    3753      101762 :     foreach(l, aggstate->aggs)
    3754             :     {
    3755       53634 :         Aggref     *aggref = lfirst(l);
    3756             :         AggStatePerAgg peragg;
    3757             :         AggStatePerTrans pertrans;
    3758             :         Oid         aggTransFnInputTypes[FUNC_MAX_ARGS];
    3759             :         int         numAggTransFnArgs;
    3760             :         int         numDirectArgs;
    3761             :         HeapTuple   aggTuple;
    3762             :         Form_pg_aggregate aggform;
    3763             :         AclResult   aclresult;
    3764             :         Oid         finalfn_oid;
    3765             :         Oid         serialfn_oid,
    3766             :                     deserialfn_oid;
    3767             :         Oid         aggOwner;
    3768             :         Expr       *finalfnexpr;
    3769             :         Oid         aggtranstype;
    3770             : 
    3771             :         /* Planner should have assigned aggregate to correct level */
    3772             :         Assert(aggref->agglevelsup == 0);
    3773             :         /* ... and the split mode should match */
    3774             :         Assert(aggref->aggsplit == aggstate->aggsplit);
    3775             : 
    3776       53634 :         peragg = &peraggs[aggref->aggno];
    3777             : 
    3778             :         /* Check if we initialized the state for this aggregate already. */
    3779       53634 :         if (peragg->aggref != NULL)
    3780         472 :             continue;
    3781             : 
    3782       53162 :         peragg->aggref = aggref;
    3783       53162 :         peragg->transno = aggref->aggtransno;
    3784             : 
    3785             :         /* Fetch the pg_aggregate row */
    3786       53162 :         aggTuple = SearchSysCache1(AGGFNOID,
    3787             :                                    ObjectIdGetDatum(aggref->aggfnoid));
    3788       53162 :         if (!HeapTupleIsValid(aggTuple))
    3789           0 :             elog(ERROR, "cache lookup failed for aggregate %u",
    3790             :                  aggref->aggfnoid);
    3791       53162 :         aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
    3792             : 
    3793             :         /* Check permission to call aggregate function */
    3794       53162 :         aclresult = object_aclcheck(ProcedureRelationId, aggref->aggfnoid, GetUserId(),
    3795             :                                     ACL_EXECUTE);
    3796       53162 :         if (aclresult != ACLCHECK_OK)
    3797           6 :             aclcheck_error(aclresult, OBJECT_AGGREGATE,
    3798           6 :                            get_func_name(aggref->aggfnoid));
    3799       53156 :         InvokeFunctionExecuteHook(aggref->aggfnoid);
    3800             : 
    3801             :         /* planner recorded transition state type in the Aggref itself */
    3802       53156 :         aggtranstype = aggref->aggtranstype;
    3803             :         Assert(OidIsValid(aggtranstype));
    3804             : 
    3805             :         /* Final function only required if we're finalizing the aggregates */
    3806       53156 :         if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
    3807        4248 :             peragg->finalfn_oid = finalfn_oid = InvalidOid;
    3808             :         else
    3809       48908 :             peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
    3810             : 
    3811       53156 :         serialfn_oid = InvalidOid;
    3812       53156 :         deserialfn_oid = InvalidOid;
    3813             : 
    3814             :         /*
    3815             :          * Check if serialization/deserialization is required.  We only do it
    3816             :          * for aggregates that have transtype INTERNAL.
    3817             :          */
    3818       53156 :         if (aggtranstype == INTERNALOID)
    3819             :         {
    3820             :             /*
    3821             :              * The planner should only have generated a serialize agg node if
    3822             :              * every aggregate with an INTERNAL state has a serialization
    3823             :              * function.  Verify that.
    3824             :              */
    3825       22260 :             if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
    3826             :             {
    3827             :                 /* serialization only valid when not running finalfn */
    3828             :                 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
    3829             : 
    3830         336 :                 if (!OidIsValid(aggform->aggserialfn))
    3831           0 :                     elog(ERROR, "serialfunc not provided for serialization aggregation");
    3832         336 :                 serialfn_oid = aggform->aggserialfn;
    3833             :             }
    3834             : 
    3835             :             /* Likewise for deserialization functions */
    3836       22260 :             if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
    3837             :             {
    3838             :                 /* deserialization only valid when combining states */
    3839             :                 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
    3840             : 
    3841         120 :                 if (!OidIsValid(aggform->aggdeserialfn))
    3842           0 :                     elog(ERROR, "deserialfunc not provided for deserialization aggregation");
    3843         120 :                 deserialfn_oid = aggform->aggdeserialfn;
    3844             :             }
    3845             :         }
    3846             : 
    3847             :         /* Check that aggregate owner has permission to call component fns */
    3848             :         {
    3849             :             HeapTuple   procTuple;
    3850             : 
    3851       53156 :             procTuple = SearchSysCache1(PROCOID,
    3852             :                                         ObjectIdGetDatum(aggref->aggfnoid));
    3853       53156 :             if (!HeapTupleIsValid(procTuple))
    3854           0 :                 elog(ERROR, "cache lookup failed for function %u",
    3855             :                      aggref->aggfnoid);
    3856       53156 :             aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
    3857       53156 :             ReleaseSysCache(procTuple);
    3858             : 
    3859       53156 :             if (OidIsValid(finalfn_oid))
    3860             :             {
    3861       23702 :                 aclresult = object_aclcheck(ProcedureRelationId, finalfn_oid, aggOwner,
    3862             :                                             ACL_EXECUTE);
    3863       23702 :                 if (aclresult != ACLCHECK_OK)
    3864           0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3865           0 :                                    get_func_name(finalfn_oid));
    3866       23702 :                 InvokeFunctionExecuteHook(finalfn_oid);
    3867             :             }
    3868       53156 :             if (OidIsValid(serialfn_oid))
    3869             :             {
    3870         336 :                 aclresult = object_aclcheck(ProcedureRelationId, serialfn_oid, aggOwner,
    3871             :                                             ACL_EXECUTE);
    3872         336 :                 if (aclresult != ACLCHECK_OK)
    3873           0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3874           0 :                                    get_func_name(serialfn_oid));
    3875         336 :                 InvokeFunctionExecuteHook(serialfn_oid);
    3876             :             }
    3877       53156 :             if (OidIsValid(deserialfn_oid))
    3878             :             {
    3879         120 :                 aclresult = object_aclcheck(ProcedureRelationId, deserialfn_oid, aggOwner,
    3880             :                                             ACL_EXECUTE);
    3881         120 :                 if (aclresult != ACLCHECK_OK)
    3882           0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3883           0 :                                    get_func_name(deserialfn_oid));
    3884         120 :                 InvokeFunctionExecuteHook(deserialfn_oid);
    3885             :             }
    3886             :         }
    3887             : 
    3888             :         /*
    3889             :          * Get actual datatypes of the (nominal) aggregate inputs.  These
    3890             :          * could be different from the agg's declared input types, when the
    3891             :          * agg accepts ANY or a polymorphic type.
    3892             :          */
    3893       53156 :         numAggTransFnArgs = get_aggregate_argtypes(aggref,
    3894             :                                                    aggTransFnInputTypes);
    3895             : 
    3896             :         /* Count the "direct" arguments, if any */
    3897       53156 :         numDirectArgs = list_length(aggref->aggdirectargs);
    3898             : 
    3899             :         /* Detect how many arguments to pass to the finalfn */
    3900       53156 :         if (aggform->aggfinalextra)
    3901       16058 :             peragg->numFinalArgs = numAggTransFnArgs + 1;
    3902             :         else
    3903       37098 :             peragg->numFinalArgs = numDirectArgs + 1;
    3904             : 
    3905             :         /* Initialize any direct-argument expressions */
    3906       53156 :         peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
    3907             :                                                  (PlanState *) aggstate);
    3908             : 
    3909             :         /*
    3910             :          * build expression trees using actual argument & result types for the
    3911             :          * finalfn, if it exists and is required.
    3912             :          */
    3913       53156 :         if (OidIsValid(finalfn_oid))
    3914             :         {
    3915       23702 :             build_aggregate_finalfn_expr(aggTransFnInputTypes,
    3916             :                                          peragg->numFinalArgs,
    3917             :                                          aggtranstype,
    3918             :                                          aggref->aggtype,
    3919             :                                          aggref->inputcollid,
    3920             :                                          finalfn_oid,
    3921             :                                          &finalfnexpr);
    3922       23702 :             fmgr_info(finalfn_oid, &peragg->finalfn);
    3923       23702 :             fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
    3924             :         }
    3925             : 
    3926             :         /* get info about the output value's datatype */
    3927       53156 :         get_typlenbyval(aggref->aggtype,
    3928             :                         &peragg->resulttypeLen,
    3929             :                         &peragg->resulttypeByVal);
    3930             : 
    3931             :         /*
    3932             :          * Build working state for invoking the transition function, if we
    3933             :          * haven't done it already.
    3934             :          */
    3935       53156 :         pertrans = &pertransstates[aggref->aggtransno];
    3936       53156 :         if (pertrans->aggref == NULL)
    3937             :         {
    3938             :             Datum       textInitVal;
    3939             :             Datum       initValue;
    3940             :             bool        initValueIsNull;
    3941             :             Oid         transfn_oid;
    3942             : 
    3943             :             /*
    3944             :              * If this aggregation is performing state combines, then instead
    3945             :              * of using the transition function, we'll use the combine
    3946             :              * function.
    3947             :              */
    3948       52898 :             if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
    3949             :             {
    3950        1354 :                 transfn_oid = aggform->aggcombinefn;
    3951             : 
    3952             :                 /* If not set then the planner messed up */
    3953        1354 :                 if (!OidIsValid(transfn_oid))
    3954           0 :                     elog(ERROR, "combinefn not set for aggregate function");
    3955             :             }
    3956             :             else
    3957       51544 :                 transfn_oid = aggform->aggtransfn;
    3958             : 
    3959       52898 :             aclresult = object_aclcheck(ProcedureRelationId, transfn_oid, aggOwner, ACL_EXECUTE);
    3960       52898 :             if (aclresult != ACLCHECK_OK)
    3961           0 :                 aclcheck_error(aclresult, OBJECT_FUNCTION,
    3962           0 :                                get_func_name(transfn_oid));
    3963       52898 :             InvokeFunctionExecuteHook(transfn_oid);
    3964             : 
    3965             :             /*
    3966             :              * initval is potentially null, so don't try to access it as a
    3967             :              * struct field. Must do it the hard way with SysCacheGetAttr.
    3968             :              */
    3969       52898 :             textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
    3970             :                                           Anum_pg_aggregate_agginitval,
    3971             :                                           &initValueIsNull);
    3972       52898 :             if (initValueIsNull)
    3973       30528 :                 initValue = (Datum) 0;
    3974             :             else
    3975       22370 :                 initValue = GetAggInitVal(textInitVal, aggtranstype);
    3976             : 
    3977       52898 :             if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
    3978             :             {
    3979        1354 :                 Oid         combineFnInputTypes[] = {aggtranstype,
    3980             :                 aggtranstype};
    3981             : 
    3982             :                 /*
    3983             :                  * When combining there's only one input, the to-be-combined
    3984             :                  * transition value.  The transition value is not counted
    3985             :                  * here.
    3986             :                  */
    3987        1354 :                 pertrans->numTransInputs = 1;
    3988             : 
    3989             :                 /* aggcombinefn always has two arguments of aggtranstype */
    3990        1354 :                 build_pertrans_for_aggref(pertrans, aggstate, estate,
    3991             :                                           aggref, transfn_oid, aggtranstype,
    3992             :                                           serialfn_oid, deserialfn_oid,
    3993             :                                           initValue, initValueIsNull,
    3994             :                                           combineFnInputTypes, 2);
    3995             : 
    3996             :                 /*
    3997             :                  * Ensure that a combine function to combine INTERNAL states
    3998             :                  * is not strict. This should have been checked during CREATE
    3999             :                  * AGGREGATE, but the strict property could have been changed
    4000             :                  * since then.
    4001             :                  */
    4002        1354 :                 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
    4003           0 :                     ereport(ERROR,
    4004             :                             (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
    4005             :                              errmsg("combine function with transition type %s must not be declared STRICT",
    4006             :                                     format_type_be(aggtranstype))));
    4007             :             }
    4008             :             else
    4009             :             {
    4010             :                 /* Detect how many arguments to pass to the transfn */
    4011       51544 :                 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
    4012         252 :                     pertrans->numTransInputs = list_length(aggref->args);
    4013             :                 else
    4014       51292 :                     pertrans->numTransInputs = numAggTransFnArgs;
    4015             : 
    4016       51544 :                 build_pertrans_for_aggref(pertrans, aggstate, estate,
    4017             :                                           aggref, transfn_oid, aggtranstype,
    4018             :                                           serialfn_oid, deserialfn_oid,
    4019             :                                           initValue, initValueIsNull,
    4020             :                                           aggTransFnInputTypes,
    4021             :                                           numAggTransFnArgs);
    4022             : 
    4023             :                 /*
    4024             :                  * If the transfn is strict and the initval is NULL, make sure
    4025             :                  * input type and transtype are the same (or at least
    4026             :                  * binary-compatible), so that it's OK to use the first
    4027             :                  * aggregated input value as the initial transValue.  This
    4028             :                  * should have been checked at agg definition time, but we
    4029             :                  * must check again in case the transfn's strictness property
    4030             :                  * has been changed.
    4031             :                  */
    4032       51544 :                 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
    4033             :                 {
    4034        5052 :                     if (numAggTransFnArgs <= numDirectArgs ||
    4035        5052 :                         !IsBinaryCoercible(aggTransFnInputTypes[numDirectArgs],
    4036             :                                            aggtranstype))
    4037           0 :                         ereport(ERROR,
    4038             :                                 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
    4039             :                                  errmsg("aggregate %u needs to have compatible input type and transition type",
    4040             :                                         aggref->aggfnoid)));
    4041             :                 }
    4042             :             }
    4043             :         }
    4044             :         else
    4045         258 :             pertrans->aggshared = true;
    4046       53156 :         ReleaseSysCache(aggTuple);
    4047             :     }
    4048             : 
    4049             :     /*
    4050             :      * Last, check whether any more aggregates got added onto the node while
    4051             :      * we processed the expressions for the aggregate arguments (including not
    4052             :      * only the regular arguments and FILTER expressions handled immediately
    4053             :      * above, but any direct arguments we might've handled earlier).  If so,
    4054             :      * we have nested aggregate functions, which is semantically nonsensical,
    4055             :      * so complain.  (This should have been caught by the parser, so we don't
    4056             :      * need to work hard on a helpful error message; but we defend against it
    4057             :      * here anyway, just to be sure.)
    4058             :      */
    4059       48128 :     if (numaggrefs != list_length(aggstate->aggs))
    4060           0 :         ereport(ERROR,
    4061             :                 (errcode(ERRCODE_GROUPING_ERROR),
    4062             :                  errmsg("aggregate function calls cannot be nested")));
    4063             : 
    4064             :     /*
    4065             :      * Build expressions doing all the transition work at once. We build a
    4066             :      * different one for each phase, as the number of transition function
    4067             :      * invocation can differ between phases. Note this'll work both for
    4068             :      * transition and combination functions (although there'll only be one
    4069             :      * phase in the latter case).
    4070             :      */
    4071      139130 :     for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
    4072             :     {
    4073       91002 :         AggStatePerPhase phase = &aggstate->phases[phaseidx];
    4074       91002 :         bool        dohash = false;
    4075       91002 :         bool        dosort = false;
    4076             : 
    4077             :         /* phase 0 doesn't necessarily exist */
    4078       91002 :         if (!phase->aggnode)
    4079       42402 :             continue;
    4080             : 
    4081       48600 :         if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
    4082             :         {
    4083             :             /*
    4084             :              * Phase one, and only phase one, in a mixed agg performs both
    4085             :              * sorting and aggregation.
    4086             :              */
    4087         232 :             dohash = true;
    4088         232 :             dosort = true;
    4089             :         }
    4090       48368 :         else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
    4091             :         {
    4092             :             /*
    4093             :              * No need to compute a transition function for an AGG_MIXED phase
    4094             :              * 0 - the contents of the hashtables will have been computed
    4095             :              * during phase 1.
    4096             :              */
    4097         232 :             continue;
    4098             :         }
    4099       48136 :         else if (phase->aggstrategy == AGG_PLAIN ||
    4100        7726 :                  phase->aggstrategy == AGG_SORTED)
    4101             :         {
    4102       42642 :             dohash = false;
    4103       42642 :             dosort = true;
    4104             :         }
    4105        5494 :         else if (phase->aggstrategy == AGG_HASHED)
    4106             :         {
    4107        5494 :             dohash = true;
    4108        5494 :             dosort = false;
    4109             :         }
    4110             :         else
    4111             :             Assert(false);
    4112             : 
    4113       48368 :         phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
    4114             :                                              false);
    4115             : 
    4116             :         /* cache compiled expression for outer slot without NULL check */
    4117       48368 :         phase->evaltrans_cache[0][0] = phase->evaltrans;
    4118             :     }
    4119             : 
    4120       48128 :     return aggstate;
    4121             : }
    4122             : 
    4123             : /*
    4124             :  * Build the state needed to calculate a state value for an aggregate.
    4125             :  *
    4126             :  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
    4127             :  * to initialize the state for. 'transfn_oid', 'aggtranstype', and the rest
    4128             :  * of the arguments could be calculated from 'aggref', but the caller has
    4129             :  * calculated them already, so might as well pass them.
    4130             :  *
    4131             :  * 'transfn_oid' may be either the Oid of the aggtransfn or the aggcombinefn.
    4132             :  */
    4133             : static void
    4134       52898 : build_pertrans_for_aggref(AggStatePerTrans pertrans,
    4135             :                           AggState *aggstate, EState *estate,
    4136             :                           Aggref *aggref,
    4137             :                           Oid transfn_oid, Oid aggtranstype,
    4138             :                           Oid aggserialfn, Oid aggdeserialfn,
    4139             :                           Datum initValue, bool initValueIsNull,
    4140             :                           Oid *inputTypes, int numArguments)
    4141             : {
    4142       52898 :     int         numGroupingSets = Max(aggstate->maxsets, 1);
    4143             :     Expr       *transfnexpr;
    4144             :     int         numTransArgs;
    4145       52898 :     Expr       *serialfnexpr = NULL;
    4146       52898 :     Expr       *deserialfnexpr = NULL;
    4147             :     ListCell   *lc;
    4148             :     int         numInputs;
    4149             :     int         numDirectArgs;
    4150             :     List       *sortlist;
    4151             :     int         numSortCols;
    4152             :     int         numDistinctCols;
    4153             :     int         i;
    4154             : 
    4155             :     /* Begin filling in the pertrans data */
    4156       52898 :     pertrans->aggref = aggref;
    4157       52898 :     pertrans->aggshared = false;
    4158       52898 :     pertrans->aggCollation = aggref->inputcollid;
    4159       52898 :     pertrans->transfn_oid = transfn_oid;
    4160       52898 :     pertrans->serialfn_oid = aggserialfn;
    4161       52898 :     pertrans->deserialfn_oid = aggdeserialfn;
    4162       52898 :     pertrans->initValue = initValue;
    4163       52898 :     pertrans->initValueIsNull = initValueIsNull;
    4164             : 
    4165             :     /* Count the "direct" arguments, if any */
    4166       52898 :     numDirectArgs = list_length(aggref->aggdirectargs);
    4167             : 
    4168             :     /* Count the number of aggregated input columns */
    4169       52898 :     pertrans->numInputs = numInputs = list_length(aggref->args);
    4170             : 
    4171       52898 :     pertrans->aggtranstype = aggtranstype;
    4172             : 
    4173             :     /* account for the current transition state */
    4174       52898 :     numTransArgs = pertrans->numTransInputs + 1;
    4175             : 
    4176             :     /*
    4177             :      * Set up infrastructure for calling the transfn.  Note that invtransfn is
    4178             :      * not needed here.
    4179             :      */
    4180       52898 :     build_aggregate_transfn_expr(inputTypes,
    4181             :                                  numArguments,
    4182             :                                  numDirectArgs,
    4183       52898 :                                  aggref->aggvariadic,
    4184             :                                  aggtranstype,
    4185             :                                  aggref->inputcollid,
    4186             :                                  transfn_oid,
    4187             :                                  InvalidOid,
    4188             :                                  &transfnexpr,
    4189             :                                  NULL);
    4190             : 
    4191       52898 :     fmgr_info(transfn_oid, &pertrans->transfn);
    4192       52898 :     fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
    4193             : 
    4194       52898 :     pertrans->transfn_fcinfo =
    4195       52898 :         (FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
    4196       52898 :     InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
    4197             :                              &pertrans->transfn,
    4198             :                              numTransArgs,
    4199             :                              pertrans->aggCollation,
    4200             :                              (Node *) aggstate, NULL);
    4201             : 
    4202             :     /* get info about the state value's datatype */
    4203       52898 :     get_typlenbyval(aggtranstype,
    4204             :                     &pertrans->transtypeLen,
    4205             :                     &pertrans->transtypeByVal);
    4206             : 
    4207       52898 :     if (OidIsValid(aggserialfn))
    4208             :     {
    4209         336 :         build_aggregate_serialfn_expr(aggserialfn,
    4210             :                                       &serialfnexpr);
    4211         336 :         fmgr_info(aggserialfn, &pertrans->serialfn);
    4212         336 :         fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
    4213             : 
    4214         336 :         pertrans->serialfn_fcinfo =
    4215         336 :             (FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
    4216         336 :         InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
    4217             :                                  &pertrans->serialfn,
    4218             :                                  1,
    4219             :                                  InvalidOid,
    4220             :                                  (Node *) aggstate, NULL);
    4221             :     }
    4222             : 
    4223       52898 :     if (OidIsValid(aggdeserialfn))
    4224             :     {
    4225         120 :         build_aggregate_deserialfn_expr(aggdeserialfn,
    4226             :                                         &deserialfnexpr);
    4227         120 :         fmgr_info(aggdeserialfn, &pertrans->deserialfn);
    4228         120 :         fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
    4229             : 
    4230         120 :         pertrans->deserialfn_fcinfo =
    4231         120 :             (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
    4232         120 :         InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
    4233             :                                  &pertrans->deserialfn,
    4234             :                                  2,
    4235             :                                  InvalidOid,
    4236             :                                  (Node *) aggstate, NULL);
    4237             :     }
    4238             : 
    4239             :     /*
    4240             :      * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
    4241             :      * have a list of SortGroupClause nodes; fish out the data in them and
    4242             :      * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
    4243             :      * however; the agg's transfn and finalfn are responsible for that.
    4244             :      *
    4245             :      * When the planner has set the aggpresorted flag, the input to the
    4246             :      * aggregate is already correctly sorted.  For ORDER BY aggregates we can
    4247             :      * simply treat these as normal aggregates.  For presorted DISTINCT
    4248             :      * aggregates an extra step must be added to remove duplicate consecutive
    4249             :      * inputs.
    4250             :      *
    4251             :      * Note that by construction, if there is a DISTINCT clause then the ORDER
    4252             :      * BY clause is a prefix of it (see transformDistinctClause).
    4253             :      */
    4254       52898 :     if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
    4255             :     {
    4256         252 :         sortlist = NIL;
    4257         252 :         numSortCols = numDistinctCols = 0;
    4258         252 :         pertrans->aggsortrequired = false;
    4259             :     }
    4260       52646 :     else if (aggref->aggpresorted && aggref->aggdistinct == NIL)
    4261             :     {
    4262        2014 :         sortlist = NIL;
    4263        2014 :         numSortCols = numDistinctCols = 0;
    4264        2014 :         pertrans->aggsortrequired = false;
    4265             :     }
    4266       50632 :     else if (aggref->aggdistinct)
    4267             :     {
    4268         570 :         sortlist = aggref->aggdistinct;
    4269         570 :         numSortCols = numDistinctCols = list_length(sortlist);
    4270             :         Assert(numSortCols >= list_length(aggref->aggorder));
    4271         570 :         pertrans->aggsortrequired = !aggref->aggpresorted;
    4272             :     }
    4273             :     else
    4274             :     {
    4275       50062 :         sortlist = aggref->aggorder;
    4276       50062 :         numSortCols = list_length(sortlist);
    4277       50062 :         numDistinctCols = 0;
    4278       50062 :         pertrans->aggsortrequired = (numSortCols > 0);
    4279             :     }
    4280             : 
    4281       52898 :     pertrans->numSortCols = numSortCols;
    4282       52898 :     pertrans->numDistinctCols = numDistinctCols;
    4283             : 
    4284             :     /*
    4285             :      * If we have either sorting or filtering to do, create a tupledesc and
    4286             :      * slot corresponding to the aggregated inputs (including sort
    4287             :      * expressions) of the agg.
    4288             :      */
    4289       52898 :     if (numSortCols > 0 || aggref->aggfilter)
    4290             :     {
    4291        1404 :         pertrans->sortdesc = ExecTypeFromTL(aggref->args);
    4292        1404 :         pertrans->sortslot =
    4293        1404 :             ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
    4294             :                                    &TTSOpsMinimalTuple);
    4295             :     }
    4296             : 
    4297       52898 :     if (numSortCols > 0)
    4298             :     {
    4299             :         /*
    4300             :          * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
    4301             :          * (yet)
    4302             :          */
    4303             :         Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
    4304             : 
    4305             :         /* ORDER BY aggregates are not supported with partial aggregation */
    4306             :         Assert(!DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
    4307             : 
    4308             :         /* If we have only one input, we need its len/byval info. */
    4309         696 :         if (numInputs == 1)
    4310             :         {
    4311         570 :             get_typlenbyval(inputTypes[numDirectArgs],
    4312             :                             &pertrans->inputtypeLen,
    4313             :                             &pertrans->inputtypeByVal);
    4314             :         }
    4315         126 :         else if (numDistinctCols > 0)
    4316             :         {
    4317             :             /* we will need an extra slot to store prior values */
    4318          96 :             pertrans->uniqslot =
    4319          96 :                 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
    4320             :                                        &TTSOpsMinimalTuple);
    4321             :         }
    4322             : 
    4323             :         /* Extract the sort information for use later */
    4324         696 :         pertrans->sortColIdx =
    4325         696 :             (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
    4326         696 :         pertrans->sortOperators =
    4327         696 :             (Oid *) palloc(numSortCols * sizeof(Oid));
    4328         696 :         pertrans->sortCollations =
    4329         696 :             (Oid *) palloc(numSortCols * sizeof(Oid));
    4330         696 :         pertrans->sortNullsFirst =
    4331         696 :             (bool *) palloc(numSortCols * sizeof(bool));
    4332             : 
    4333         696 :         i = 0;
    4334        1578 :         foreach(lc, sortlist)
    4335             :         {
    4336         882 :             SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
    4337         882 :             TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
    4338             : 
    4339             :             /* the parser should have made sure of this */
    4340             :             Assert(OidIsValid(sortcl->sortop));
    4341             : 
    4342         882 :             pertrans->sortColIdx[i] = tle->resno;
    4343         882 :             pertrans->sortOperators[i] = sortcl->sortop;
    4344         882 :             pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
    4345         882 :             pertrans->sortNullsFirst[i] = sortcl->nulls_first;
    4346         882 :             i++;
    4347             :         }
    4348             :         Assert(i == numSortCols);
    4349             :     }
    4350             : 
    4351       52898 :     if (aggref->aggdistinct)
    4352             :     {
    4353             :         Oid        *ops;
    4354             : 
    4355             :         Assert(numArguments > 0);
    4356             :         Assert(list_length(aggref->aggdistinct) == numDistinctCols);
    4357             : 
    4358         570 :         ops = palloc(numDistinctCols * sizeof(Oid));
    4359             : 
    4360         570 :         i = 0;
    4361        1308 :         foreach(lc, aggref->aggdistinct)
    4362         738 :             ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
    4363             : 
    4364             :         /* lookup / build the necessary comparators */
    4365         570 :         if (numDistinctCols == 1)
    4366         474 :             fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
    4367             :         else
    4368          96 :             pertrans->equalfnMulti =
    4369          96 :                 execTuplesMatchPrepare(pertrans->sortdesc,
    4370             :                                        numDistinctCols,
    4371          96 :                                        pertrans->sortColIdx,
    4372             :                                        ops,
    4373          96 :                                        pertrans->sortCollations,
    4374             :                                        &aggstate->ss.ps);
    4375         570 :         pfree(ops);
    4376             :     }
    4377             : 
    4378       52898 :     pertrans->sortstates = (Tuplesortstate **)
    4379       52898 :         palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
    4380       52898 : }
    4381             : 
    4382             : 
    4383             : static Datum
    4384       22370 : GetAggInitVal(Datum textInitVal, Oid transtype)
    4385             : {
    4386             :     Oid         typinput,
    4387             :                 typioparam;
    4388             :     char       *strInitVal;
    4389             :     Datum       initVal;
    4390             : 
    4391       22370 :     getTypeInputInfo(transtype, &typinput, &typioparam);
    4392       22370 :     strInitVal = TextDatumGetCString(textInitVal);
    4393       22370 :     initVal = OidInputFunctionCall(typinput, strInitVal,
    4394             :                                    typioparam, -1);
    4395       22370 :     pfree(strInitVal);
    4396       22370 :     return initVal;
    4397             : }
    4398             : 
    4399             : void
    4400       47996 : ExecEndAgg(AggState *node)
    4401             : {
    4402             :     PlanState  *outerPlan;
    4403             :     int         transno;
    4404       47996 :     int         numGroupingSets = Max(node->maxsets, 1);
    4405             :     int         setno;
    4406             : 
    4407             :     /*
    4408             :      * When ending a parallel worker, copy the statistics gathered by the
    4409             :      * worker back into shared memory so that it can be picked up by the main
    4410             :      * process to report in EXPLAIN ANALYZE.
    4411             :      */
    4412       47996 :     if (node->shared_info && IsParallelWorker())
    4413             :     {
    4414             :         AggregateInstrumentation *si;
    4415             : 
    4416             :         Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
    4417         166 :         si = &node->shared_info->sinstrument[ParallelWorkerNumber];
    4418         166 :         si->hash_batches_used = node->hash_batches_used;
    4419         166 :         si->hash_disk_used = node->hash_disk_used;
    4420         166 :         si->hash_mem_peak = node->hash_mem_peak;
    4421             :     }
    4422             : 
    4423             :     /* Make sure we have closed any open tuplesorts */
    4424             : 
    4425       47996 :     if (node->sort_in)
    4426         156 :         tuplesort_end(node->sort_in);
    4427       47996 :     if (node->sort_out)
    4428          42 :         tuplesort_end(node->sort_out);
    4429             : 
    4430       47996 :     hashagg_reset_spill_state(node);
    4431             : 
    4432       47996 :     if (node->hash_metacxt != NULL)
    4433             :     {
    4434        5718 :         MemoryContextDelete(node->hash_metacxt);
    4435        5718 :         node->hash_metacxt = NULL;
    4436             :     }
    4437       47996 :     if (node->hash_tablecxt != NULL)
    4438             :     {
    4439        5718 :         MemoryContextDelete(node->hash_tablecxt);
    4440        5718 :         node->hash_tablecxt = NULL;
    4441             :     }
    4442             : 
    4443             : 
    4444      100758 :     for (transno = 0; transno < node->numtrans; transno++)
    4445             :     {
    4446       52762 :         AggStatePerTrans pertrans = &node->pertrans[transno];
    4447             : 
    4448      106538 :         for (setno = 0; setno < numGroupingSets; setno++)
    4449             :         {
    4450       53776 :             if (pertrans->sortstates[setno])
    4451           0 :                 tuplesort_end(pertrans->sortstates[setno]);
    4452             :         }
    4453             :     }
    4454             : 
    4455             :     /* And ensure any agg shutdown callbacks have been called */
    4456       96832 :     for (setno = 0; setno < numGroupingSets; setno++)
    4457       48836 :         ReScanExprContext(node->aggcontexts[setno]);
    4458       47996 :     if (node->hashcontext)
    4459        5718 :         ReScanExprContext(node->hashcontext);
    4460             : 
    4461       47996 :     outerPlan = outerPlanState(node);
    4462       47996 :     ExecEndNode(outerPlan);
    4463       47996 : }
    4464             : 
    4465             : void
    4466       51528 : ExecReScanAgg(AggState *node)
    4467             : {
    4468       51528 :     ExprContext *econtext = node->ss.ps.ps_ExprContext;
    4469       51528 :     PlanState  *outerPlan = outerPlanState(node);
    4470       51528 :     Agg        *aggnode = (Agg *) node->ss.ps.plan;
    4471             :     int         transno;
    4472       51528 :     int         numGroupingSets = Max(node->maxsets, 1);
    4473             :     int         setno;
    4474             : 
    4475       51528 :     node->agg_done = false;
    4476             : 
    4477       51528 :     if (node->aggstrategy == AGG_HASHED)
    4478             :     {
    4479             :         /*
    4480             :          * In the hashed case, if we haven't yet built the hash table then we
    4481             :          * can just return; nothing done yet, so nothing to undo. If subnode's
    4482             :          * chgParam is not NULL then it will be re-scanned by ExecProcNode,
    4483             :          * else no reason to re-scan it at all.
    4484             :          */
    4485       12014 :         if (!node->table_filled)
    4486         132 :             return;
    4487             : 
    4488             :         /*
    4489             :          * If we do have the hash table, and it never spilled, and the subplan
    4490             :          * does not have any parameter changes, and none of our own parameter
    4491             :          * changes affect input expressions of the aggregated functions, then
    4492             :          * we can just rescan the existing hash table; no need to build it
    4493             :          * again.
    4494             :          */
    4495       11882 :         if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
    4496         902 :             !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
    4497             :         {
    4498         878 :             ResetTupleHashIterator(node->perhash[0].hashtable,
    4499             :                                    &node->perhash[0].hashiter);
    4500         878 :             select_current_set(node, 0, true);
    4501         878 :             return;
    4502             :         }
    4503             :     }
    4504             : 
    4505             :     /* Make sure we have closed any open tuplesorts */
    4506      117140 :     for (transno = 0; transno < node->numtrans; transno++)
    4507             :     {
    4508      133280 :         for (setno = 0; setno < numGroupingSets; setno++)
    4509             :         {
    4510       66658 :             AggStatePerTrans pertrans = &node->pertrans[transno];
    4511             : 
    4512       66658 :             if (pertrans->sortstates[setno])
    4513             :             {
    4514           0 :                 tuplesort_end(pertrans->sortstates[setno]);
    4515           0 :                 pertrans->sortstates[setno] = NULL;
    4516             :             }
    4517             :         }
    4518             :     }
    4519             : 
    4520             :     /*
    4521             :      * We don't need to ReScanExprContext the output tuple context here;
    4522             :      * ExecReScan already did it. But we do need to reset our per-grouping-set
    4523             :      * contexts, which may have transvalues stored in them. (We use rescan
    4524             :      * rather than just reset because transfns may have registered callbacks
    4525             :      * that need to be run now.) For the AGG_HASHED case, see below.
    4526             :      */
    4527             : 
    4528      101072 :     for (setno = 0; setno < numGroupingSets; setno++)
    4529             :     {
    4530       50554 :         ReScanExprContext(node->aggcontexts[setno]);
    4531             :     }
    4532             : 
    4533             :     /* Release first tuple of group, if we have made a copy */
    4534       50518 :     if (node->grp_firstTuple != NULL)
    4535             :     {
    4536           0 :         heap_freetuple(node->grp_firstTuple);
    4537           0 :         node->grp_firstTuple = NULL;
    4538             :     }
    4539       50518 :     ExecClearTuple(node->ss.ss_ScanTupleSlot);
    4540             : 
    4541             :     /* Forget current agg values */
    4542      117140 :     MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
    4543       50518 :     MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
    4544             : 
    4545             :     /*
    4546             :      * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
    4547             :      * the hashcontext. This used to be an issue, but now, resetting a context
    4548             :      * automatically deletes sub-contexts too.
    4549             :      */
    4550       50518 :     if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
    4551             :     {
    4552       11034 :         hashagg_reset_spill_state(node);
    4553             : 
    4554       11034 :         node->hash_ever_spilled = false;
    4555       11034 :         node->hash_spill_mode = false;
    4556       11034 :         node->hash_ngroups_current = 0;
    4557             : 
    4558       11034 :         ReScanExprContext(node->hashcontext);
    4559       11034 :         MemoryContextReset(node->hash_tablecxt);
    4560             :         /* Rebuild an empty hash table */
    4561       11034 :         build_hash_tables(node);
    4562       11034 :         node->table_filled = false;
    4563             :         /* iterator will be reset when the table is filled */
    4564             : 
    4565       11034 :         hashagg_recompile_expressions(node, false, false);
    4566             :     }
    4567             : 
    4568       50518 :     if (node->aggstrategy != AGG_HASHED)
    4569             :     {
    4570             :         /*
    4571             :          * Reset the per-group state (in particular, mark transvalues null)
    4572             :          */
    4573       79064 :         for (setno = 0; setno < numGroupingSets; setno++)
    4574             :         {
    4575      172770 :             MemSet(node->pergroups[setno], 0,
    4576             :                    sizeof(AggStatePerGroupData) * node->numaggs);
    4577             :         }
    4578             : 
    4579             :         /* reset to phase 1 */
    4580       39514 :         initialize_phase(node, 1);
    4581             : 
    4582       39514 :         node->input_done = false;
    4583       39514 :         node->projected_set = -1;
    4584             :     }
    4585             : 
    4586       50518 :     if (outerPlan->chgParam == NULL)
    4587         188 :         ExecReScan(outerPlan);
    4588             : }
    4589             : 
    4590             : 
    4591             : /***********************************************************************
    4592             :  * API exposed to aggregate functions
    4593             :  ***********************************************************************/
    4594             : 
    4595             : 
    4596             : /*
    4597             :  * AggCheckCallContext - test if a SQL function is being called as an aggregate
    4598             :  *
    4599             :  * The transition and/or final functions of an aggregate may want to verify
    4600             :  * that they are being called as aggregates, rather than as plain SQL
    4601             :  * functions.  They should use this function to do so.  The return value
    4602             :  * is nonzero if being called as an aggregate, or zero if not.  (Specific
    4603             :  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
    4604             :  * values could conceivably appear in future.)
    4605             :  *
    4606             :  * If aggcontext isn't NULL, the function also stores at *aggcontext the
    4607             :  * identity of the memory context that aggregate transition values are being
    4608             :  * stored in.  Note that the same aggregate call site (flinfo) may be called
    4609             :  * interleaved on different transition values in different contexts, so it's
    4610             :  * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
    4611             :  * cache it in the transvalue itself (for internal-type transvalues).
    4612             :  */
    4613             : int
    4614     5377192 : AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
    4615             : {
    4616     5377192 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4617             :     {
    4618     5365806 :         if (aggcontext)
    4619             :         {
    4620     2748192 :             AggState   *aggstate = ((AggState *) fcinfo->context);
    4621     2748192 :             ExprContext *cxt = aggstate->curaggcontext;
    4622             : 
    4623     2748192 :             *aggcontext = cxt->ecxt_per_tuple_memory;
    4624             :         }
    4625     5365806 :         return AGG_CONTEXT_AGGREGATE;
    4626             :     }
    4627       11386 :     if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
    4628             :     {
    4629        9512 :         if (aggcontext)
    4630         710 :             *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
    4631        9512 :         return AGG_CONTEXT_WINDOW;
    4632             :     }
    4633             : 
    4634             :     /* this is just to prevent "uninitialized variable" warnings */
    4635        1874 :     if (aggcontext)
    4636        1826 :         *aggcontext = NULL;
    4637        1874 :     return 0;
    4638             : }
    4639             : 
    4640             : /*
    4641             :  * AggGetAggref - allow an aggregate support function to get its Aggref
    4642             :  *
    4643             :  * If the function is being called as an aggregate support function,
    4644             :  * return the Aggref node for the aggregate call.  Otherwise, return NULL.
    4645             :  *
    4646             :  * Aggregates sharing the same inputs and transition functions can get
    4647             :  * merged into a single transition calculation.  If the transition function
    4648             :  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
    4649             :  * executing.  It must therefore not pay attention to the Aggref fields that
    4650             :  * relate to the final function, as those are indeterminate.  But if a final
    4651             :  * function calls AggGetAggref, it will get a precise result.
    4652             :  *
    4653             :  * Note that if an aggregate is being used as a window function, this will
    4654             :  * return NULL.  We could provide a similar function to return the relevant
    4655             :  * WindowFunc node in such cases, but it's not needed yet.
    4656             :  */
    4657             : Aggref *
    4658         246 : AggGetAggref(FunctionCallInfo fcinfo)
    4659             : {
    4660         246 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4661             :     {
    4662         246 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4663             :         AggStatePerAgg curperagg;
    4664             :         AggStatePerTrans curpertrans;
    4665             : 
    4666             :         /* check curperagg (valid when in a final function) */
    4667         246 :         curperagg = aggstate->curperagg;
    4668             : 
    4669         246 :         if (curperagg)
    4670           0 :             return curperagg->aggref;
    4671             : 
    4672             :         /* check curpertrans (valid when in a transition function) */
    4673         246 :         curpertrans = aggstate->curpertrans;
    4674             : 
    4675         246 :         if (curpertrans)
    4676         246 :             return curpertrans->aggref;
    4677             :     }
    4678           0 :     return NULL;
    4679             : }
    4680             : 
    4681             : /*
    4682             :  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
    4683             :  *
    4684             :  * This is useful in agg final functions; the context returned is one that
    4685             :  * the final function can safely reset as desired.  This isn't useful for
    4686             :  * transition functions, since the context returned MAY (we don't promise)
    4687             :  * be the same as the context those are called in.
    4688             :  *
    4689             :  * As above, this is currently not useful for aggs called as window functions.
    4690             :  */
    4691             : MemoryContext
    4692           0 : AggGetTempMemoryContext(FunctionCallInfo fcinfo)
    4693             : {
    4694           0 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4695             :     {
    4696           0 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4697             : 
    4698           0 :         return aggstate->tmpcontext->ecxt_per_tuple_memory;
    4699             :     }
    4700           0 :     return NULL;
    4701             : }
    4702             : 
    4703             : /*
    4704             :  * AggStateIsShared - find out whether transition state is shared
    4705             :  *
    4706             :  * If the function is being called as an aggregate support function,
    4707             :  * return true if the aggregate's transition state is shared across
    4708             :  * multiple aggregates, false if it is not.
    4709             :  *
    4710             :  * Returns true if not called as an aggregate support function.
    4711             :  * This is intended as a conservative answer, ie "no you'd better not
    4712             :  * scribble on your input".  In particular, will return true if the
    4713             :  * aggregate is being used as a window function, which is a scenario
    4714             :  * in which changing the transition state is a bad idea.  We might
    4715             :  * want to refine the behavior for the window case in future.
    4716             :  */
    4717             : bool
    4718         246 : AggStateIsShared(FunctionCallInfo fcinfo)
    4719             : {
    4720         246 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4721             :     {
    4722         246 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4723             :         AggStatePerAgg curperagg;
    4724             :         AggStatePerTrans curpertrans;
    4725             : 
    4726             :         /* check curperagg (valid when in a final function) */
    4727         246 :         curperagg = aggstate->curperagg;
    4728             : 
    4729         246 :         if (curperagg)
    4730           0 :             return aggstate->pertrans[curperagg->transno].aggshared;
    4731             : 
    4732             :         /* check curpertrans (valid when in a transition function) */
    4733         246 :         curpertrans = aggstate->curpertrans;
    4734             : 
    4735         246 :         if (curpertrans)
    4736         246 :             return curpertrans->aggshared;
    4737             :     }
    4738           0 :     return true;
    4739             : }
    4740             : 
    4741             : /*
    4742             :  * AggRegisterCallback - register a cleanup callback for an aggregate
    4743             :  *
    4744             :  * This is useful for aggs to register shutdown callbacks, which will ensure
    4745             :  * that non-memory resources are freed.  The callback will occur just before
    4746             :  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
    4747             :  * either between groups or as a result of rescanning the query.  The callback
    4748             :  * will NOT be called on error paths.  The typical use-case is for freeing of
    4749             :  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
    4750             :  * created by the agg functions.  (The callback will not be called until after
    4751             :  * the result of the finalfn is no longer needed, so it's safe for the finalfn
    4752             :  * to return data that will be freed by the callback.)
    4753             :  *
    4754             :  * As above, this is currently not useful for aggs called as window functions.
    4755             :  */
    4756             : void
    4757         660 : AggRegisterCallback(FunctionCallInfo fcinfo,
    4758             :                     ExprContextCallbackFunction func,
    4759             :                     Datum arg)
    4760             : {
    4761         660 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4762             :     {
    4763         660 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4764         660 :         ExprContext *cxt = aggstate->curaggcontext;
    4765             : 
    4766         660 :         RegisterExprContextCallback(cxt, func, arg);
    4767             : 
    4768         660 :         return;
    4769             :     }
    4770           0 :     elog(ERROR, "aggregate function cannot register a callback in this context");
    4771             : }
    4772             : 
    4773             : 
    4774             : /* ----------------------------------------------------------------
    4775             :  *                      Parallel Query Support
    4776             :  * ----------------------------------------------------------------
    4777             :  */
    4778             : 
    4779             :  /* ----------------------------------------------------------------
    4780             :   *     ExecAggEstimate
    4781             :   *
    4782             :   *     Estimate space required to propagate aggregate statistics.
    4783             :   * ----------------------------------------------------------------
    4784             :   */
    4785             : void
    4786         554 : ExecAggEstimate(AggState *node, ParallelContext *pcxt)
    4787             : {
    4788             :     Size        size;
    4789             : 
    4790             :     /* don't need this if not instrumenting or no workers */
    4791         554 :     if (!node->ss.ps.instrument || pcxt->nworkers == 0)
    4792         452 :         return;
    4793             : 
    4794         102 :     size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
    4795         102 :     size = add_size(size, offsetof(SharedAggInfo, sinstrument));
    4796         102 :     shm_toc_estimate_chunk(&pcxt->estimator, size);
    4797         102 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
    4798             : }
    4799             : 
    4800             : /* ----------------------------------------------------------------
    4801             :  *      ExecAggInitializeDSM
    4802             :  *
    4803             :  *      Initialize DSM space for aggregate statistics.
    4804             :  * ----------------------------------------------------------------
    4805             :  */
    4806             : void
    4807         554 : ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
    4808             : {
    4809             :     Size        size;
    4810             : 
    4811             :     /* don't need this if not instrumenting or no workers */
    4812         554 :     if (!node->ss.ps.instrument || pcxt->nworkers == 0)
    4813         452 :         return;
    4814             : 
    4815         102 :     size = offsetof(SharedAggInfo, sinstrument)
    4816         102 :         + pcxt->nworkers * sizeof(AggregateInstrumentation);
    4817         102 :     node->shared_info = shm_toc_allocate(pcxt->toc, size);
    4818             :     /* ensure any unfilled slots will contain zeroes */
    4819         102 :     memset(node->shared_info, 0, size);
    4820         102 :     node->shared_info->num_workers = pcxt->nworkers;
    4821         102 :     shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
    4822         102 :                    node->shared_info);
    4823             : }
    4824             : 
    4825             : /* ----------------------------------------------------------------
    4826             :  *      ExecAggInitializeWorker
    4827             :  *
    4828             :  *      Attach worker to DSM space for aggregate statistics.
    4829             :  * ----------------------------------------------------------------
    4830             :  */
    4831             : void
    4832        1546 : ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
    4833             : {
    4834        1546 :     node->shared_info =
    4835        1546 :         shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
    4836        1546 : }
    4837             : 
    4838             : /* ----------------------------------------------------------------
    4839             :  *      ExecAggRetrieveInstrumentation
    4840             :  *
    4841             :  *      Transfer aggregate statistics from DSM to private memory.
    4842             :  * ----------------------------------------------------------------
    4843             :  */
    4844             : void
    4845         102 : ExecAggRetrieveInstrumentation(AggState *node)
    4846             : {
    4847             :     Size        size;
    4848             :     SharedAggInfo *si;
    4849             : 
    4850         102 :     if (node->shared_info == NULL)
    4851           0 :         return;
    4852             : 
    4853         102 :     size = offsetof(SharedAggInfo, sinstrument)
    4854         102 :         + node->shared_info->num_workers * sizeof(AggregateInstrumentation);
    4855         102 :     si = palloc(size);
    4856         102 :     memcpy(si, node->shared_info, size);
    4857         102 :     node->shared_info = si;
    4858             : }

Generated by: LCOV version 1.14