LCOV - code coverage report
Current view: top level - src/backend/executor - nodeAgg.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.7 % 1500 1420
Test Date: 2026-03-21 18:16:11 Functions: 98.3 % 58 57
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * nodeAgg.c
       4              :  *    Routines to handle aggregate nodes.
       5              :  *
       6              :  *    ExecAgg normally evaluates each aggregate in the following steps:
       7              :  *
       8              :  *       transvalue = initcond
       9              :  *       foreach input_tuple do
      10              :  *          transvalue = transfunc(transvalue, input_value(s))
      11              :  *       result = finalfunc(transvalue, direct_argument(s))
      12              :  *
      13              :  *    If a finalfunc is not supplied then the result is just the ending
      14              :  *    value of transvalue.
      15              :  *
      16              :  *    Other behaviors can be selected by the "aggsplit" mode, which exists
      17              :  *    to support partial aggregation.  It is possible to:
      18              :  *    * Skip running the finalfunc, so that the output is always the
      19              :  *    final transvalue state.
      20              :  *    * Substitute the combinefunc for the transfunc, so that transvalue
      21              :  *    states (propagated up from a child partial-aggregation step) are merged
      22              :  *    rather than processing raw input rows.  (The statements below about
      23              :  *    the transfunc apply equally to the combinefunc, when it's selected.)
      24              :  *    * Apply the serializefunc to the output values (this only makes sense
      25              :  *    when skipping the finalfunc, since the serializefunc works on the
      26              :  *    transvalue data type).
      27              :  *    * Apply the deserializefunc to the input values (this only makes sense
      28              :  *    when using the combinefunc, for similar reasons).
      29              :  *    It is the planner's responsibility to connect up Agg nodes using these
      30              :  *    alternate behaviors in a way that makes sense, with partial aggregation
      31              :  *    results being fed to nodes that expect them.
      32              :  *
      33              :  *    If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
      34              :  *    input tuples and eliminate duplicates (if required) before performing
      35              :  *    the above-depicted process.  (However, we don't do that for ordered-set
      36              :  *    aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
      37              :  *    so far as this module is concerned.)  Note that partial aggregation
      38              :  *    is not supported in these cases, since we couldn't ensure global
      39              :  *    ordering or distinctness of the inputs.
      40              :  *
      41              :  *    If transfunc is marked "strict" in pg_proc and initcond is NULL,
      42              :  *    then the first non-NULL input_value is assigned directly to transvalue,
      43              :  *    and transfunc isn't applied until the second non-NULL input_value.
      44              :  *    The agg's first input type and transtype must be the same in this case!
      45              :  *
      46              :  *    If transfunc is marked "strict" then NULL input_values are skipped,
      47              :  *    keeping the previous transvalue.  If transfunc is not strict then it
      48              :  *    is called for every input tuple and must deal with NULL initcond
      49              :  *    or NULL input_values for itself.
      50              :  *
      51              :  *    If finalfunc is marked "strict" then it is not called when the
      52              :  *    ending transvalue is NULL, instead a NULL result is created
      53              :  *    automatically (this is just the usual handling of strict functions,
      54              :  *    of course).  A non-strict finalfunc can make its own choice of
      55              :  *    what to return for a NULL ending transvalue.
      56              :  *
      57              :  *    Ordered-set aggregates are treated specially in one other way: we
      58              :  *    evaluate any "direct" arguments and pass them to the finalfunc along
      59              :  *    with the transition value.
      60              :  *
      61              :  *    A finalfunc can have additional arguments beyond the transvalue and
      62              :  *    any "direct" arguments, corresponding to the input arguments of the
      63              :  *    aggregate.  These are always just passed as NULL.  Such arguments may be
      64              :  *    needed to allow resolution of a polymorphic aggregate's result type.
      65              :  *
      66              :  *    We compute aggregate input expressions and run the transition functions
      67              :  *    in a temporary econtext (aggstate->tmpcontext).  This is reset at least
      68              :  *    once per input tuple, so when the transvalue datatype is
      69              :  *    pass-by-reference, we have to be careful to copy it into a longer-lived
      70              :  *    memory context, and free the prior value to avoid memory leakage.  We
      71              :  *    store transvalues in another set of econtexts, aggstate->aggcontexts
      72              :  *    (one per grouping set, see below), which are also used for the hashtable
      73              :  *    structures in AGG_HASHED mode.  These econtexts are rescanned, not just
      74              :  *    reset, at group boundaries so that aggregate transition functions can
      75              :  *    register shutdown callbacks via AggRegisterCallback.
      76              :  *
      77              :  *    The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
      78              :  *    run finalize functions and compute the output tuple; this context can be
      79              :  *    reset once per output tuple.
      80              :  *
      81              :  *    The executor's AggState node is passed as the fmgr "context" value in
      82              :  *    all transfunc and finalfunc calls.  It is not recommended that the
      83              :  *    transition functions look at the AggState node directly, but they can
      84              :  *    use AggCheckCallContext() to verify that they are being called by
      85              :  *    nodeAgg.c (and not as ordinary SQL functions).  The main reason a
      86              :  *    transition function might want to know this is so that it can avoid
      87              :  *    palloc'ing a fixed-size pass-by-ref transition value on every call:
      88              :  *    it can instead just scribble on and return its left input.  Ordinarily
      89              :  *    it is completely forbidden for functions to modify pass-by-ref inputs,
      90              :  *    but in the aggregate case we know the left input is either the initial
      91              :  *    transition value or a previous function result, and in either case its
      92              :  *    value need not be preserved.  See int8inc() for an example.  Notice that
      93              :  *    the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
      94              :  *    the previous transition value pointer is returned.  It is also possible
      95              :  *    to avoid repeated data copying when the transition value is an expanded
      96              :  *    object: to do that, the transition function must take care to return
      97              :  *    an expanded object that is in a child context of the memory context
      98              :  *    returned by AggCheckCallContext().  Also, some transition functions want
      99              :  *    to store working state in addition to the nominal transition value; they
     100              :  *    can use the memory context returned by AggCheckCallContext() to do that.
     101              :  *
     102              :  *    Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
     103              :  *    AggState is available as context in earlier releases (back to 8.1),
     104              :  *    but direct examination of the node is needed to use it before 9.0.
     105              :  *
     106              :  *    As of 9.4, aggregate transition functions can also use AggGetAggref()
     107              :  *    to get hold of the Aggref expression node for their aggregate call.
     108              :  *    This is mainly intended for ordered-set aggregates, which are not
     109              :  *    supported as window functions.  (A regular aggregate function would
     110              :  *    need some fallback logic to use this, since there's no Aggref node
     111              :  *    for a window function.)
     112              :  *
     113              :  *    Grouping sets:
     114              :  *
     115              :  *    A list of grouping sets which is structurally equivalent to a ROLLUP
     116              :  *    clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
     117              :  *    ordered data.  We do this by keeping a separate set of transition values
     118              :  *    for each grouping set being concurrently processed; for each input tuple
     119              :  *    we update them all, and on group boundaries we reset those states
     120              :  *    (starting at the front of the list) whose grouping values have changed
     121              :  *    (the list of grouping sets is ordered from most specific to least
     122              :  *    specific).
     123              :  *
     124              :  *    Where more complex grouping sets are used, we break them down into
     125              :  *    "phases", where each phase has a different sort order (except phase 0
     126              :  *    which is reserved for hashing).  During each phase but the last, the
     127              :  *    input tuples are additionally stored in a tuplesort which is keyed to the
     128              :  *    next phase's sort order; during each phase but the first, the input
     129              :  *    tuples are drawn from the previously sorted data.  (The sorting of the
     130              :  *    data for the first phase is handled by the planner, as it might be
     131              :  *    satisfied by underlying nodes.)
     132              :  *
     133              :  *    Hashing can be mixed with sorted grouping.  To do this, we have an
     134              :  *    AGG_MIXED strategy that populates the hashtables during the first sorted
     135              :  *    phase, and switches to reading them out after completing all sort phases.
     136              :  *    We can also support AGG_HASHED with multiple hash tables and no sorting
     137              :  *    at all.
     138              :  *
     139              :  *    From the perspective of aggregate transition and final functions, the
     140              :  *    only issue regarding grouping sets is this: a single call site (flinfo)
     141              :  *    of an aggregate function may be used for updating several different
     142              :  *    transition values in turn. So the function must not cache in the flinfo
     143              :  *    anything which logically belongs as part of the transition value (most
     144              :  *    importantly, the memory context in which the transition value exists).
     145              :  *    The support API functions (AggCheckCallContext, AggRegisterCallback) are
     146              :  *    sensitive to the grouping set for which the aggregate function is
     147              :  *    currently being called.
     148              :  *
     149              :  *    Plan structure:
     150              :  *
     151              :  *    What we get from the planner is actually one "real" Agg node which is
     152              :  *    part of the plan tree proper, but which optionally has an additional list
     153              :  *    of Agg nodes hung off the side via the "chain" field.  This is because an
     154              :  *    Agg node happens to be a convenient representation of all the data we
     155              :  *    need for grouping sets.
     156              :  *
     157              :  *    For many purposes, we treat the "real" node as if it were just the first
     158              :  *    node in the chain.  The chain must be ordered such that hashed entries
     159              :  *    come before sorted/plain entries; the real node is marked AGG_MIXED if
     160              :  *    there are both types present (in which case the real node describes one
     161              :  *    of the hashed groupings, other AGG_HASHED nodes may optionally follow in
     162              :  *    the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
     163              :  *    the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
     164              :  *    nodes must be of the same type; if it is AGG_PLAIN, there can be no
     165              :  *    chained nodes.
     166              :  *
     167              :  *    We collect all hashed nodes into a single "phase", numbered 0, and create
     168              :  *    a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
     169              :  *    Phase 0 is allocated even if there are no hashes, but remains unused in
     170              :  *    that case.
     171              :  *
     172              :  *    AGG_HASHED nodes actually refer to only a single grouping set each,
     173              :  *    because for each hashed grouping we need a separate grpColIdx and
     174              :  *    numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
     175              :  *    grouping sets that share a sort order.  Each AGG_SORTED node other than
     176              :  *    the first one has an associated Sort node which describes the sort order
     177              :  *    to be used; the first sorted node takes its input from the outer subtree,
     178              :  *    which the planner has already arranged to provide ordered data.
     179              :  *
     180              :  *    Memory and ExprContext usage:
     181              :  *
     182              :  *    Because we're accumulating aggregate values across input rows, we need to
     183              :  *    use more memory contexts than just simple input/output tuple contexts.
     184              :  *    In fact, for a rollup, we need a separate context for each grouping set
     185              :  *    so that we can reset the inner (finer-grained) aggregates on their group
     186              :  *    boundaries while continuing to accumulate values for outer
     187              :  *    (coarser-grained) groupings.  On top of this, we might be simultaneously
     188              :  *    populating hashtables; however, we only need one context for all the
     189              :  *    hashtables.
     190              :  *
     191              :  *    So we create an array, aggcontexts, with an ExprContext for each grouping
     192              :  *    set in the largest rollup that we're going to process, and use the
     193              :  *    per-tuple memory context of those ExprContexts to store the aggregate
     194              :  *    transition values.  hashcontext is the single context created to support
     195              :  *    all hash tables.
     196              :  *
     197              :  *    Spilling To Disk
     198              :  *
     199              :  *    When performing hash aggregation, if the hash table memory exceeds the
     200              :  *    limit (see hash_agg_check_limits()), we enter "spill mode". In spill
     201              :  *    mode, we advance the transition states only for groups already in the
     202              :  *    hash table. For tuples that would need to create a new hash table
     203              :  *    entries (and initialize new transition states), we instead spill them to
     204              :  *    disk to be processed later. The tuples are spilled in a partitioned
     205              :  *    manner, so that subsequent batches are smaller and less likely to exceed
     206              :  *    hash_mem (if a batch does exceed hash_mem, it must be spilled
     207              :  *    recursively).
     208              :  *
     209              :  *    Spilled data is written to logical tapes. These provide better control
     210              :  *    over memory usage, disk space, and the number of files than if we were
     211              :  *    to use a BufFile for each spill.  We don't know the number of tapes needed
     212              :  *    at the start of the algorithm (because it can recurse), so a tape set is
     213              :  *    allocated at the beginning, and individual tapes are created as needed.
     214              :  *    As a particular tape is read, logtape.c recycles its disk space. When a
     215              :  *    tape is read to completion, it is destroyed entirely.
     216              :  *
     217              :  *    Tapes' buffers can take up substantial memory when many tapes are open at
     218              :  *    once. We only need one tape open at a time in read mode (using a buffer
     219              :  *    that's a multiple of BLCKSZ); but we need one tape open in write mode (each
     220              :  *    requiring a buffer of size BLCKSZ) for each partition.
     221              :  *
     222              :  *    Note that it's possible for transition states to start small but then
     223              :  *    grow very large; for instance in the case of ARRAY_AGG. In such cases,
     224              :  *    it's still possible to significantly exceed hash_mem. We try to avoid
     225              :  *    this situation by estimating what will fit in the available memory, and
     226              :  *    imposing a limit on the number of groups separately from the amount of
     227              :  *    memory consumed.
     228              :  *
     229              :  *    Transition / Combine function invocation:
     230              :  *
     231              :  *    For performance reasons transition functions, including combine
     232              :  *    functions, aren't invoked one-by-one from nodeAgg.c after computing
     233              :  *    arguments using the expression evaluation engine. Instead
     234              :  *    ExecBuildAggTrans() builds one large expression that does both argument
     235              :  *    evaluation and transition function invocation. That avoids performance
     236              :  *    issues due to repeated uses of expression evaluation, complications due
     237              :  *    to filter expressions having to be evaluated early, and allows to JIT
     238              :  *    the entire expression into one native function.
     239              :  *
     240              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
     241              :  * Portions Copyright (c) 1994, Regents of the University of California
     242              :  *
     243              :  * IDENTIFICATION
     244              :  *    src/backend/executor/nodeAgg.c
     245              :  *
     246              :  *-------------------------------------------------------------------------
     247              :  */
     248              : 
     249              : #include "postgres.h"
     250              : 
     251              : #include "access/htup_details.h"
     252              : #include "access/parallel.h"
     253              : #include "catalog/objectaccess.h"
     254              : #include "catalog/pg_aggregate.h"
     255              : #include "catalog/pg_proc.h"
     256              : #include "catalog/pg_type.h"
     257              : #include "common/hashfn.h"
     258              : #include "executor/execExpr.h"
     259              : #include "executor/executor.h"
     260              : #include "executor/instrument.h"
     261              : #include "executor/nodeAgg.h"
     262              : #include "lib/hyperloglog.h"
     263              : #include "miscadmin.h"
     264              : #include "nodes/nodeFuncs.h"
     265              : #include "optimizer/optimizer.h"
     266              : #include "parser/parse_agg.h"
     267              : #include "parser/parse_coerce.h"
     268              : #include "port/pg_bitutils.h"
     269              : #include "utils/acl.h"
     270              : #include "utils/builtins.h"
     271              : #include "utils/datum.h"
     272              : #include "utils/expandeddatum.h"
     273              : #include "utils/injection_point.h"
     274              : #include "utils/logtape.h"
     275              : #include "utils/lsyscache.h"
     276              : #include "utils/memutils.h"
     277              : #include "utils/memutils_memorychunk.h"
     278              : #include "utils/syscache.h"
     279              : #include "utils/tuplesort.h"
     280              : 
     281              : /*
     282              :  * Control how many partitions are created when spilling HashAgg to
     283              :  * disk.
     284              :  *
     285              :  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
     286              :  * partitions needed such that each partition will fit in memory. The factor
     287              :  * is set higher than one because there's not a high cost to having a few too
     288              :  * many partitions, and it makes it less likely that a partition will need to
     289              :  * be spilled recursively. Another benefit of having more, smaller partitions
     290              :  * is that small hash tables may perform better than large ones due to memory
     291              :  * caching effects.
     292              :  *
     293              :  * We also specify a min and max number of partitions per spill. Too few might
     294              :  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
     295              :  * many will result in lots of memory wasted buffering the spill files (which
     296              :  * could instead be spent on a larger hash table).
     297              :  */
     298              : #define HASHAGG_PARTITION_FACTOR 1.50
     299              : #define HASHAGG_MIN_PARTITIONS 4
     300              : #define HASHAGG_MAX_PARTITIONS 1024
     301              : 
     302              : /*
     303              :  * For reading from tapes, the buffer size must be a multiple of
     304              :  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
     305              :  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
     306              :  * tape always uses a buffer of size BLCKSZ.
     307              :  */
     308              : #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
     309              : #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
     310              : 
     311              : /*
     312              :  * HyperLogLog is used for estimating the cardinality of the spilled tuples in
     313              :  * a given partition. 5 bits corresponds to a size of about 32 bytes and a
     314              :  * worst-case error of around 18%. That's effective enough to choose a
     315              :  * reasonable number of partitions when recursing.
     316              :  */
     317              : #define HASHAGG_HLL_BIT_WIDTH 5
     318              : 
     319              : /*
     320              :  * Assume the palloc overhead always uses sizeof(MemoryChunk) bytes.
     321              :  */
     322              : #define CHUNKHDRSZ sizeof(MemoryChunk)
     323              : 
     324              : /*
     325              :  * Represents partitioned spill data for a single hashtable. Contains the
     326              :  * necessary information to route tuples to the correct partition, and to
     327              :  * transform the spilled data into new batches.
     328              :  *
     329              :  * The high bits are used for partition selection (when recursing, we ignore
     330              :  * the bits that have already been used for partition selection at an earlier
     331              :  * level).
     332              :  */
     333              : typedef struct HashAggSpill
     334              : {
     335              :     int         npartitions;    /* number of partitions */
     336              :     LogicalTape **partitions;   /* spill partition tapes */
     337              :     int64      *ntuples;        /* number of tuples in each partition */
     338              :     uint32      mask;           /* mask to find partition from hash value */
     339              :     int         shift;          /* after masking, shift by this amount */
     340              :     hyperLogLogState *hll_card; /* cardinality estimate for contents */
     341              : } HashAggSpill;
     342              : 
     343              : /*
     344              :  * Represents work to be done for one pass of hash aggregation (with only one
     345              :  * grouping set).
     346              :  *
     347              :  * Also tracks the bits of the hash already used for partition selection by
     348              :  * earlier iterations, so that this batch can use new bits. If all bits have
     349              :  * already been used, no partitioning will be done (any spilled data will go
     350              :  * to a single output tape).
     351              :  */
     352              : typedef struct HashAggBatch
     353              : {
     354              :     int         setno;          /* grouping set */
     355              :     int         used_bits;      /* number of bits of hash already used */
     356              :     LogicalTape *input_tape;    /* input partition tape */
     357              :     int64       input_tuples;   /* number of tuples in this batch */
     358              :     double      input_card;     /* estimated group cardinality */
     359              : } HashAggBatch;
     360              : 
     361              : /* used to find referenced colnos */
     362              : typedef struct FindColsContext
     363              : {
     364              :     bool        is_aggref;      /* is under an aggref */
     365              :     Bitmapset  *aggregated;     /* column references under an aggref */
     366              :     Bitmapset  *unaggregated;   /* other column references */
     367              : } FindColsContext;
     368              : 
     369              : static void select_current_set(AggState *aggstate, int setno, bool is_hash);
     370              : static void initialize_phase(AggState *aggstate, int newphase);
     371              : static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
     372              : static void initialize_aggregates(AggState *aggstate,
     373              :                                   AggStatePerGroup *pergroups,
     374              :                                   int numReset);
     375              : static void advance_transition_function(AggState *aggstate,
     376              :                                         AggStatePerTrans pertrans,
     377              :                                         AggStatePerGroup pergroupstate);
     378              : static void advance_aggregates(AggState *aggstate);
     379              : static void process_ordered_aggregate_single(AggState *aggstate,
     380              :                                              AggStatePerTrans pertrans,
     381              :                                              AggStatePerGroup pergroupstate);
     382              : static void process_ordered_aggregate_multi(AggState *aggstate,
     383              :                                             AggStatePerTrans pertrans,
     384              :                                             AggStatePerGroup pergroupstate);
     385              : static void finalize_aggregate(AggState *aggstate,
     386              :                                AggStatePerAgg peragg,
     387              :                                AggStatePerGroup pergroupstate,
     388              :                                Datum *resultVal, bool *resultIsNull);
     389              : static void finalize_partialaggregate(AggState *aggstate,
     390              :                                       AggStatePerAgg peragg,
     391              :                                       AggStatePerGroup pergroupstate,
     392              :                                       Datum *resultVal, bool *resultIsNull);
     393              : static inline void prepare_hash_slot(AggStatePerHash perhash,
     394              :                                      TupleTableSlot *inputslot,
     395              :                                      TupleTableSlot *hashslot);
     396              : static void prepare_projection_slot(AggState *aggstate,
     397              :                                     TupleTableSlot *slot,
     398              :                                     int currentSet);
     399              : static void finalize_aggregates(AggState *aggstate,
     400              :                                 AggStatePerAgg peraggs,
     401              :                                 AggStatePerGroup pergroup);
     402              : static TupleTableSlot *project_aggregates(AggState *aggstate);
     403              : static void find_cols(AggState *aggstate, Bitmapset **aggregated,
     404              :                       Bitmapset **unaggregated);
     405              : static bool find_cols_walker(Node *node, FindColsContext *context);
     406              : static void build_hash_tables(AggState *aggstate);
     407              : static void build_hash_table(AggState *aggstate, int setno, double nbuckets);
     408              : static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
     409              :                                           bool nullcheck);
     410              : static void hash_create_memory(AggState *aggstate);
     411              : static double hash_choose_num_buckets(double hashentrysize,
     412              :                                       double ngroups, Size memory);
     413              : static int  hash_choose_num_partitions(double input_groups,
     414              :                                        double hashentrysize,
     415              :                                        int used_bits,
     416              :                                        int *log2_npartitions);
     417              : static void initialize_hash_entry(AggState *aggstate,
     418              :                                   TupleHashTable hashtable,
     419              :                                   TupleHashEntry entry);
     420              : static void lookup_hash_entries(AggState *aggstate);
     421              : static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
     422              : static void agg_fill_hash_table(AggState *aggstate);
     423              : static bool agg_refill_hash_table(AggState *aggstate);
     424              : static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
     425              : static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
     426              : static void hash_agg_check_limits(AggState *aggstate);
     427              : static void hash_agg_enter_spill_mode(AggState *aggstate);
     428              : static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
     429              :                                     int npartitions);
     430              : static void hashagg_finish_initial_spills(AggState *aggstate);
     431              : static void hashagg_reset_spill_state(AggState *aggstate);
     432              : static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
     433              :                                        int64 input_tuples, double input_card,
     434              :                                        int used_bits);
     435              : static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
     436              : static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset,
     437              :                                int used_bits, double input_groups,
     438              :                                double hashentrysize);
     439              : static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
     440              :                                 TupleTableSlot *inputslot, uint32 hash);
     441              : static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
     442              :                                  int setno);
     443              : static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
     444              : static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
     445              :                                       AggState *aggstate, EState *estate,
     446              :                                       Aggref *aggref, Oid transfn_oid,
     447              :                                       Oid aggtranstype, Oid aggserialfn,
     448              :                                       Oid aggdeserialfn, Datum initValue,
     449              :                                       bool initValueIsNull, Oid *inputTypes,
     450              :                                       int numArguments);
     451              : 
     452              : 
     453              : /*
     454              :  * Select the current grouping set; affects current_set and
     455              :  * curaggcontext.
     456              :  */
     457              : static void
     458      5312483 : select_current_set(AggState *aggstate, int setno, bool is_hash)
     459              : {
     460              :     /*
     461              :      * When changing this, also adapt ExecAggPlainTransByVal() and
     462              :      * ExecAggPlainTransByRef().
     463              :      */
     464      5312483 :     if (is_hash)
     465      4889718 :         aggstate->curaggcontext = aggstate->hashcontext;
     466              :     else
     467       422765 :         aggstate->curaggcontext = aggstate->aggcontexts[setno];
     468              : 
     469      5312483 :     aggstate->current_set = setno;
     470      5312483 : }
     471              : 
     472              : /*
     473              :  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
     474              :  * current_phase + 1. Juggle the tuplesorts accordingly.
     475              :  *
     476              :  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
     477              :  * case, so when entering phase 0, all we need to do is drop open sorts.
     478              :  */
     479              : static void
     480        56180 : initialize_phase(AggState *aggstate, int newphase)
     481              : {
     482              :     Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
     483              : 
     484              :     /*
     485              :      * Whatever the previous state, we're now done with whatever input
     486              :      * tuplesort was in use.
     487              :      */
     488        56180 :     if (aggstate->sort_in)
     489              :     {
     490           28 :         tuplesort_end(aggstate->sort_in);
     491           28 :         aggstate->sort_in = NULL;
     492              :     }
     493              : 
     494        56180 :     if (newphase <= 1)
     495              :     {
     496              :         /*
     497              :          * Discard any existing output tuplesort.
     498              :          */
     499        56032 :         if (aggstate->sort_out)
     500              :         {
     501            4 :             tuplesort_end(aggstate->sort_out);
     502            4 :             aggstate->sort_out = NULL;
     503              :         }
     504              :     }
     505              :     else
     506              :     {
     507              :         /*
     508              :          * The old output tuplesort becomes the new input one, and this is the
     509              :          * right time to actually sort it.
     510              :          */
     511          148 :         aggstate->sort_in = aggstate->sort_out;
     512          148 :         aggstate->sort_out = NULL;
     513              :         Assert(aggstate->sort_in);
     514          148 :         tuplesort_performsort(aggstate->sort_in);
     515              :     }
     516              : 
     517              :     /*
     518              :      * If this isn't the last phase, we need to sort appropriately for the
     519              :      * next phase in sequence.
     520              :      */
     521        56180 :     if (newphase > 0 && newphase < aggstate->numphases - 1)
     522              :     {
     523          196 :         Sort       *sortnode = aggstate->phases[newphase + 1].sortnode;
     524          196 :         PlanState  *outerNode = outerPlanState(aggstate);
     525          196 :         TupleDesc   tupDesc = ExecGetResultType(outerNode);
     526              : 
     527          196 :         aggstate->sort_out = tuplesort_begin_heap(tupDesc,
     528              :                                                   sortnode->numCols,
     529              :                                                   sortnode->sortColIdx,
     530              :                                                   sortnode->sortOperators,
     531              :                                                   sortnode->collations,
     532              :                                                   sortnode->nullsFirst,
     533              :                                                   work_mem,
     534              :                                                   NULL, TUPLESORT_NONE);
     535              :     }
     536              : 
     537        56180 :     aggstate->current_phase = newphase;
     538        56180 :     aggstate->phase = &aggstate->phases[newphase];
     539        56180 : }
     540              : 
     541              : /*
     542              :  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
     543              :  * populated by the previous phase.  Copy it to the sorter for the next phase
     544              :  * if any.
     545              :  *
     546              :  * Callers cannot rely on memory for tuple in returned slot remaining valid
     547              :  * past any subsequently fetched tuple.
     548              :  */
     549              : static TupleTableSlot *
     550     18959181 : fetch_input_tuple(AggState *aggstate)
     551              : {
     552              :     TupleTableSlot *slot;
     553              : 
     554     18959181 :     if (aggstate->sort_in)
     555              :     {
     556              :         /* make sure we check for interrupts in either path through here */
     557       236620 :         CHECK_FOR_INTERRUPTS();
     558       236620 :         if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
     559              :                                     aggstate->sort_slot, NULL))
     560          148 :             return NULL;
     561       236472 :         slot = aggstate->sort_slot;
     562              :     }
     563              :     else
     564     18722561 :         slot = ExecProcNode(outerPlanState(aggstate));
     565              : 
     566     18958987 :     if (!TupIsNull(slot) && aggstate->sort_out)
     567       236472 :         tuplesort_puttupleslot(aggstate->sort_out, slot);
     568              : 
     569     18958987 :     return slot;
     570              : }
     571              : 
     572              : /*
     573              :  * (Re)Initialize an individual aggregate.
     574              :  *
     575              :  * This function handles only one grouping set, already set in
     576              :  * aggstate->current_set.
     577              :  *
     578              :  * When called, CurrentMemoryContext should be the per-query context.
     579              :  */
     580              : static void
     581       726423 : initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
     582              :                      AggStatePerGroup pergroupstate)
     583              : {
     584              :     /*
     585              :      * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
     586              :      */
     587       726423 :     if (pertrans->aggsortrequired)
     588              :     {
     589              :         /*
     590              :          * In case of rescan, maybe there could be an uncompleted sort
     591              :          * operation?  Clean it up if so.
     592              :          */
     593        35860 :         if (pertrans->sortstates[aggstate->current_set])
     594            0 :             tuplesort_end(pertrans->sortstates[aggstate->current_set]);
     595              : 
     596              : 
     597              :         /*
     598              :          * We use a plain Datum sorter when there's a single input column;
     599              :          * otherwise sort the full tuple.  (See comments for
     600              :          * process_ordered_aggregate_single.)
     601              :          */
     602        35860 :         if (pertrans->numInputs == 1)
     603              :         {
     604        35804 :             Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
     605              : 
     606        35804 :             pertrans->sortstates[aggstate->current_set] =
     607        35804 :                 tuplesort_begin_datum(attr->atttypid,
     608        35804 :                                       pertrans->sortOperators[0],
     609        35804 :                                       pertrans->sortCollations[0],
     610        35804 :                                       pertrans->sortNullsFirst[0],
     611              :                                       work_mem, NULL, TUPLESORT_NONE);
     612              :         }
     613              :         else
     614           56 :             pertrans->sortstates[aggstate->current_set] =
     615           56 :                 tuplesort_begin_heap(pertrans->sortdesc,
     616              :                                      pertrans->numSortCols,
     617              :                                      pertrans->sortColIdx,
     618              :                                      pertrans->sortOperators,
     619              :                                      pertrans->sortCollations,
     620              :                                      pertrans->sortNullsFirst,
     621              :                                      work_mem, NULL, TUPLESORT_NONE);
     622              :     }
     623              : 
     624              :     /*
     625              :      * (Re)set transValue to the initial value.
     626              :      *
     627              :      * Note that when the initial value is pass-by-ref, we must copy it (into
     628              :      * the aggcontext) since we will pfree the transValue later.
     629              :      */
     630       726423 :     if (pertrans->initValueIsNull)
     631       367723 :         pergroupstate->transValue = pertrans->initValue;
     632              :     else
     633              :     {
     634              :         MemoryContext oldContext;
     635              : 
     636       358700 :         oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
     637       717400 :         pergroupstate->transValue = datumCopy(pertrans->initValue,
     638       358700 :                                               pertrans->transtypeByVal,
     639       358700 :                                               pertrans->transtypeLen);
     640       358700 :         MemoryContextSwitchTo(oldContext);
     641              :     }
     642       726423 :     pergroupstate->transValueIsNull = pertrans->initValueIsNull;
     643              : 
     644              :     /*
     645              :      * If the initial value for the transition state doesn't exist in the
     646              :      * pg_aggregate table then we will let the first non-NULL value returned
     647              :      * from the outer procNode become the initial value. (This is useful for
     648              :      * aggregates like max() and min().) The noTransValue flag signals that we
     649              :      * still need to do this.
     650              :      */
     651       726423 :     pergroupstate->noTransValue = pertrans->initValueIsNull;
     652       726423 : }
     653              : 
     654              : /*
     655              :  * Initialize all aggregate transition states for a new group of input values.
     656              :  *
     657              :  * If there are multiple grouping sets, we initialize only the first numReset
     658              :  * of them (the grouping sets are ordered so that the most specific one, which
     659              :  * is reset most often, is first). As a convenience, if numReset is 0, we
     660              :  * reinitialize all sets.
     661              :  *
     662              :  * NB: This cannot be used for hash aggregates, as for those the grouping set
     663              :  * number has to be specified from further up.
     664              :  *
     665              :  * When called, CurrentMemoryContext should be the per-query context.
     666              :  */
     667              : static void
     668       187278 : initialize_aggregates(AggState *aggstate,
     669              :                       AggStatePerGroup *pergroups,
     670              :                       int numReset)
     671              : {
     672              :     int         transno;
     673       187278 :     int         numGroupingSets = Max(aggstate->phase->numsets, 1);
     674       187278 :     int         setno = 0;
     675       187278 :     int         numTrans = aggstate->numtrans;
     676       187278 :     AggStatePerTrans transstates = aggstate->pertrans;
     677              : 
     678       187278 :     if (numReset == 0)
     679            0 :         numReset = numGroupingSets;
     680              : 
     681       384008 :     for (setno = 0; setno < numReset; setno++)
     682              :     {
     683       196730 :         AggStatePerGroup pergroup = pergroups[setno];
     684              : 
     685       196730 :         select_current_set(aggstate, setno, false);
     686              : 
     687       623716 :         for (transno = 0; transno < numTrans; transno++)
     688              :         {
     689       426986 :             AggStatePerTrans pertrans = &transstates[transno];
     690       426986 :             AggStatePerGroup pergroupstate = &pergroup[transno];
     691              : 
     692       426986 :             initialize_aggregate(aggstate, pertrans, pergroupstate);
     693              :         }
     694              :     }
     695       187278 : }
     696              : 
     697              : /*
     698              :  * Given new input value(s), advance the transition function of one aggregate
     699              :  * state within one grouping set only (already set in aggstate->current_set)
     700              :  *
     701              :  * The new values (and null flags) have been preloaded into argument positions
     702              :  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
     703              :  * pass to the transition function.  We also expect that the static fields of
     704              :  * the fcinfo are already initialized; that was done by ExecInitAgg().
     705              :  *
     706              :  * It doesn't matter which memory context this is called in.
     707              :  */
     708              : static void
     709       479803 : advance_transition_function(AggState *aggstate,
     710              :                             AggStatePerTrans pertrans,
     711              :                             AggStatePerGroup pergroupstate)
     712              : {
     713       479803 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     714              :     MemoryContext oldContext;
     715              :     Datum       newVal;
     716              : 
     717       479803 :     if (pertrans->transfn.fn_strict)
     718              :     {
     719              :         /*
     720              :          * For a strict transfn, nothing happens when there's a NULL input; we
     721              :          * just keep the prior transValue.
     722              :          */
     723       150000 :         int         numTransInputs = pertrans->numTransInputs;
     724              :         int         i;
     725              : 
     726       300000 :         for (i = 1; i <= numTransInputs; i++)
     727              :         {
     728       150000 :             if (fcinfo->args[i].isnull)
     729            0 :                 return;
     730              :         }
     731       150000 :         if (pergroupstate->noTransValue)
     732              :         {
     733              :             /*
     734              :              * transValue has not been initialized. This is the first non-NULL
     735              :              * input value. We use it as the initial value for transValue. (We
     736              :              * already checked that the agg's input type is binary-compatible
     737              :              * with its transtype, so straight copy here is OK.)
     738              :              *
     739              :              * We must copy the datum into aggcontext if it is pass-by-ref. We
     740              :              * do not need to pfree the old transValue, since it's NULL.
     741              :              */
     742            0 :             oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
     743            0 :             pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
     744            0 :                                                   pertrans->transtypeByVal,
     745            0 :                                                   pertrans->transtypeLen);
     746            0 :             pergroupstate->transValueIsNull = false;
     747            0 :             pergroupstate->noTransValue = false;
     748            0 :             MemoryContextSwitchTo(oldContext);
     749            0 :             return;
     750              :         }
     751       150000 :         if (pergroupstate->transValueIsNull)
     752              :         {
     753              :             /*
     754              :              * Don't call a strict function with NULL inputs.  Note it is
     755              :              * possible to get here despite the above tests, if the transfn is
     756              :              * strict *and* returned a NULL on a prior cycle. If that happens
     757              :              * we will propagate the NULL all the way to the end.
     758              :              */
     759            0 :             return;
     760              :         }
     761              :     }
     762              : 
     763              :     /* We run the transition functions in per-input-tuple memory context */
     764       479803 :     oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
     765              : 
     766              :     /* set up aggstate->curpertrans for AggGetAggref() */
     767       479803 :     aggstate->curpertrans = pertrans;
     768              : 
     769              :     /*
     770              :      * OK to call the transition function
     771              :      */
     772       479803 :     fcinfo->args[0].value = pergroupstate->transValue;
     773       479803 :     fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
     774       479803 :     fcinfo->isnull = false;      /* just in case transfn doesn't set it */
     775              : 
     776       479803 :     newVal = FunctionCallInvoke(fcinfo);
     777              : 
     778       479803 :     aggstate->curpertrans = NULL;
     779              : 
     780              :     /*
     781              :      * If pass-by-ref datatype, must copy the new value into aggcontext and
     782              :      * free the prior transValue.  But if transfn returned a pointer to its
     783              :      * first input, we don't need to do anything.
     784              :      *
     785              :      * It's safe to compare newVal with pergroup->transValue without regard
     786              :      * for either being NULL, because ExecAggCopyTransValue takes care to set
     787              :      * transValue to 0 when NULL. Otherwise we could end up accidentally not
     788              :      * reparenting, when the transValue has the same numerical value as
     789              :      * newValue, despite being NULL.  This is a somewhat hot path, making it
     790              :      * undesirable to instead solve this with another branch for the common
     791              :      * case of the transition function returning its (modified) input
     792              :      * argument.
     793              :      */
     794       479803 :     if (!pertrans->transtypeByVal &&
     795            0 :         DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
     796            0 :         newVal = ExecAggCopyTransValue(aggstate, pertrans,
     797            0 :                                        newVal, fcinfo->isnull,
     798              :                                        pergroupstate->transValue,
     799            0 :                                        pergroupstate->transValueIsNull);
     800              : 
     801       479803 :     pergroupstate->transValue = newVal;
     802       479803 :     pergroupstate->transValueIsNull = fcinfo->isnull;
     803              : 
     804       479803 :     MemoryContextSwitchTo(oldContext);
     805              : }
     806              : 
     807              : /*
     808              :  * Advance each aggregate transition state for one input tuple.  The input
     809              :  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
     810              :  * accessible to ExecEvalExpr.
     811              :  *
     812              :  * We have two sets of transition states to handle: one for sorted aggregation
     813              :  * and one for hashed; we do them both here, to avoid multiple evaluation of
     814              :  * the inputs.
     815              :  *
     816              :  * When called, CurrentMemoryContext should be the per-query context.
     817              :  */
     818              : static void
     819     19327140 : advance_aggregates(AggState *aggstate)
     820              : {
     821     19327140 :     ExecEvalExprNoReturnSwitchContext(aggstate->phase->evaltrans,
     822              :                                       aggstate->tmpcontext);
     823     19327088 : }
     824              : 
     825              : /*
     826              :  * Run the transition function for a DISTINCT or ORDER BY aggregate
     827              :  * with only one input.  This is called after we have completed
     828              :  * entering all the input values into the sort object.  We complete the
     829              :  * sort, read out the values in sorted order, and run the transition
     830              :  * function on each value (applying DISTINCT if appropriate).
     831              :  *
     832              :  * Note that the strictness of the transition function was checked when
     833              :  * entering the values into the sort, so we don't check it again here;
     834              :  * we just apply standard SQL DISTINCT logic.
     835              :  *
     836              :  * The one-input case is handled separately from the multi-input case
     837              :  * for performance reasons: for single by-value inputs, such as the
     838              :  * common case of count(distinct id), the tuplesort_getdatum code path
     839              :  * is around 300% faster.  (The speedup for by-reference types is less
     840              :  * but still noticeable.)
     841              :  *
     842              :  * This function handles only one grouping set (already set in
     843              :  * aggstate->current_set).
     844              :  *
     845              :  * When called, CurrentMemoryContext should be the per-query context.
     846              :  */
     847              : static void
     848        35804 : process_ordered_aggregate_single(AggState *aggstate,
     849              :                                  AggStatePerTrans pertrans,
     850              :                                  AggStatePerGroup pergroupstate)
     851              : {
     852        35804 :     Datum       oldVal = (Datum) 0;
     853        35804 :     bool        oldIsNull = true;
     854        35804 :     bool        haveOldVal = false;
     855        35804 :     MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
     856              :     MemoryContext oldContext;
     857        35804 :     bool        isDistinct = (pertrans->numDistinctCols > 0);
     858        35804 :     Datum       newAbbrevVal = (Datum) 0;
     859        35804 :     Datum       oldAbbrevVal = (Datum) 0;
     860        35804 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     861              :     Datum      *newVal;
     862              :     bool       *isNull;
     863              : 
     864              :     Assert(pertrans->numDistinctCols < 2);
     865              : 
     866        35804 :     tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
     867              : 
     868              :     /* Load the column into argument 1 (arg 0 will be transition value) */
     869        35804 :     newVal = &fcinfo->args[1].value;
     870        35804 :     isNull = &fcinfo->args[1].isnull;
     871              : 
     872              :     /*
     873              :      * Note: if input type is pass-by-ref, the datums returned by the sort are
     874              :      * freshly palloc'd in the per-query context, so we must be careful to
     875              :      * pfree them when they are no longer needed.
     876              :      */
     877              : 
     878       595425 :     while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
     879              :                               true, false, newVal, isNull, &newAbbrevVal))
     880              :     {
     881              :         /*
     882              :          * Clear and select the working context for evaluation of the equality
     883              :          * function and transition function.
     884              :          */
     885       559621 :         MemoryContextReset(workcontext);
     886       559621 :         oldContext = MemoryContextSwitchTo(workcontext);
     887              : 
     888              :         /*
     889              :          * If DISTINCT mode, and not distinct from prior, skip it.
     890              :          */
     891       559621 :         if (isDistinct &&
     892       203613 :             haveOldVal &&
     893            0 :             ((oldIsNull && *isNull) ||
     894       203613 :              (!oldIsNull && !*isNull &&
     895       399746 :               oldAbbrevVal == newAbbrevVal &&
     896       196133 :               DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
     897              :                                              pertrans->aggCollation,
     898              :                                              oldVal, *newVal)))))
     899              :         {
     900        79962 :             MemoryContextSwitchTo(oldContext);
     901        79962 :             continue;
     902              :         }
     903              :         else
     904              :         {
     905       479659 :             advance_transition_function(aggstate, pertrans, pergroupstate);
     906              : 
     907       479659 :             MemoryContextSwitchTo(oldContext);
     908              : 
     909              :             /*
     910              :              * Forget the old value, if any, and remember the new one for
     911              :              * subsequent equality checks.
     912              :              */
     913       479659 :             if (!pertrans->inputtypeByVal)
     914              :             {
     915       350192 :                 if (!oldIsNull)
     916       350072 :                     pfree(DatumGetPointer(oldVal));
     917       350192 :                 if (!*isNull)
     918       350152 :                     oldVal = datumCopy(*newVal, pertrans->inputtypeByVal,
     919       350152 :                                        pertrans->inputtypeLen);
     920              :             }
     921              :             else
     922       129467 :                 oldVal = *newVal;
     923       479659 :             oldAbbrevVal = newAbbrevVal;
     924       479659 :             oldIsNull = *isNull;
     925       479659 :             haveOldVal = true;
     926              :         }
     927              :     }
     928              : 
     929        35804 :     if (!oldIsNull && !pertrans->inputtypeByVal)
     930           80 :         pfree(DatumGetPointer(oldVal));
     931              : 
     932        35804 :     tuplesort_end(pertrans->sortstates[aggstate->current_set]);
     933        35804 :     pertrans->sortstates[aggstate->current_set] = NULL;
     934        35804 : }
     935              : 
     936              : /*
     937              :  * Run the transition function for a DISTINCT or ORDER BY aggregate
     938              :  * with more than one input.  This is called after we have completed
     939              :  * entering all the input values into the sort object.  We complete the
     940              :  * sort, read out the values in sorted order, and run the transition
     941              :  * function on each value (applying DISTINCT if appropriate).
     942              :  *
     943              :  * This function handles only one grouping set (already set in
     944              :  * aggstate->current_set).
     945              :  *
     946              :  * When called, CurrentMemoryContext should be the per-query context.
     947              :  */
     948              : static void
     949           56 : process_ordered_aggregate_multi(AggState *aggstate,
     950              :                                 AggStatePerTrans pertrans,
     951              :                                 AggStatePerGroup pergroupstate)
     952              : {
     953           56 :     ExprContext *tmpcontext = aggstate->tmpcontext;
     954           56 :     FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
     955           56 :     TupleTableSlot *slot1 = pertrans->sortslot;
     956           56 :     TupleTableSlot *slot2 = pertrans->uniqslot;
     957           56 :     int         numTransInputs = pertrans->numTransInputs;
     958           56 :     int         numDistinctCols = pertrans->numDistinctCols;
     959           56 :     Datum       newAbbrevVal = (Datum) 0;
     960           56 :     Datum       oldAbbrevVal = (Datum) 0;
     961           56 :     bool        haveOldValue = false;
     962           56 :     TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
     963              :     int         i;
     964              : 
     965           56 :     tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
     966              : 
     967           56 :     ExecClearTuple(slot1);
     968           56 :     if (slot2)
     969            0 :         ExecClearTuple(slot2);
     970              : 
     971          200 :     while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
     972              :                                   true, true, slot1, &newAbbrevVal))
     973              :     {
     974          144 :         CHECK_FOR_INTERRUPTS();
     975              : 
     976          144 :         tmpcontext->ecxt_outertuple = slot1;
     977          144 :         tmpcontext->ecxt_innertuple = slot2;
     978              : 
     979          144 :         if (numDistinctCols == 0 ||
     980            0 :             !haveOldValue ||
     981            0 :             newAbbrevVal != oldAbbrevVal ||
     982            0 :             !ExecQual(pertrans->equalfnMulti, tmpcontext))
     983              :         {
     984              :             /*
     985              :              * Extract the first numTransInputs columns as datums to pass to
     986              :              * the transfn.
     987              :              */
     988          144 :             slot_getsomeattrs(slot1, numTransInputs);
     989              : 
     990              :             /* Load values into fcinfo */
     991              :             /* Start from 1, since the 0th arg will be the transition value */
     992          408 :             for (i = 0; i < numTransInputs; i++)
     993              :             {
     994          264 :                 fcinfo->args[i + 1].value = slot1->tts_values[i];
     995          264 :                 fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
     996              :             }
     997              : 
     998          144 :             advance_transition_function(aggstate, pertrans, pergroupstate);
     999              : 
    1000          144 :             if (numDistinctCols > 0)
    1001              :             {
    1002              :                 /* swap the slot pointers to retain the current tuple */
    1003            0 :                 TupleTableSlot *tmpslot = slot2;
    1004              : 
    1005            0 :                 slot2 = slot1;
    1006            0 :                 slot1 = tmpslot;
    1007              :                 /* avoid ExecQual() calls by reusing abbreviated keys */
    1008            0 :                 oldAbbrevVal = newAbbrevVal;
    1009            0 :                 haveOldValue = true;
    1010              :             }
    1011              :         }
    1012              : 
    1013              :         /* Reset context each time */
    1014          144 :         ResetExprContext(tmpcontext);
    1015              : 
    1016          144 :         ExecClearTuple(slot1);
    1017              :     }
    1018              : 
    1019           56 :     if (slot2)
    1020            0 :         ExecClearTuple(slot2);
    1021              : 
    1022           56 :     tuplesort_end(pertrans->sortstates[aggstate->current_set]);
    1023           56 :     pertrans->sortstates[aggstate->current_set] = NULL;
    1024              : 
    1025              :     /* restore previous slot, potentially in use for grouping sets */
    1026           56 :     tmpcontext->ecxt_outertuple = save;
    1027           56 : }
    1028              : 
    1029              : /*
    1030              :  * Compute the final value of one aggregate.
    1031              :  *
    1032              :  * This function handles only one grouping set (already set in
    1033              :  * aggstate->current_set).
    1034              :  *
    1035              :  * The finalfn will be run, and the result delivered, in the
    1036              :  * output-tuple context; caller's CurrentMemoryContext does not matter.
    1037              :  * (But note that in some cases, such as when there is no finalfn, the
    1038              :  * result might be a pointer to or into the agg's transition value.)
    1039              :  *
    1040              :  * The finalfn uses the state as set in the transno.  This also might be
    1041              :  * being used by another aggregate function, so it's important that we do
    1042              :  * nothing destructive here.  Moreover, the aggregate's final value might
    1043              :  * get used in multiple places, so we mustn't return a R/W expanded datum.
    1044              :  */
    1045              : static void
    1046       717209 : finalize_aggregate(AggState *aggstate,
    1047              :                    AggStatePerAgg peragg,
    1048              :                    AggStatePerGroup pergroupstate,
    1049              :                    Datum *resultVal, bool *resultIsNull)
    1050              : {
    1051       717209 :     LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
    1052       717209 :     bool        anynull = false;
    1053              :     MemoryContext oldContext;
    1054              :     int         i;
    1055              :     ListCell   *lc;
    1056       717209 :     AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
    1057              : 
    1058       717209 :     oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
    1059              : 
    1060              :     /*
    1061              :      * Evaluate any direct arguments.  We do this even if there's no finalfn
    1062              :      * (which is unlikely anyway), so that side-effects happen as expected.
    1063              :      * The direct arguments go into arg positions 1 and up, leaving position 0
    1064              :      * for the transition state value.
    1065              :      */
    1066       717209 :     i = 1;
    1067       717851 :     foreach(lc, peragg->aggdirectargs)
    1068              :     {
    1069          642 :         ExprState  *expr = (ExprState *) lfirst(lc);
    1070              : 
    1071          642 :         fcinfo->args[i].value = ExecEvalExpr(expr,
    1072              :                                              aggstate->ss.ps.ps_ExprContext,
    1073              :                                              &fcinfo->args[i].isnull);
    1074          642 :         anynull |= fcinfo->args[i].isnull;
    1075          642 :         i++;
    1076              :     }
    1077              : 
    1078              :     /*
    1079              :      * Apply the agg's finalfn if one is provided, else return transValue.
    1080              :      */
    1081       717209 :     if (OidIsValid(peragg->finalfn_oid))
    1082              :     {
    1083       205480 :         int         numFinalArgs = peragg->numFinalArgs;
    1084              : 
    1085              :         /* set up aggstate->curperagg for AggGetAggref() */
    1086       205480 :         aggstate->curperagg = peragg;
    1087              : 
    1088       205480 :         InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
    1089              :                                  numFinalArgs,
    1090              :                                  pertrans->aggCollation,
    1091              :                                  (Node *) aggstate, NULL);
    1092              : 
    1093              :         /* Fill in the transition state value */
    1094       205480 :         fcinfo->args[0].value =
    1095       205480 :             MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1096              :                                        pergroupstate->transValueIsNull,
    1097              :                                        pertrans->transtypeLen);
    1098       205480 :         fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
    1099       205480 :         anynull |= pergroupstate->transValueIsNull;
    1100              : 
    1101              :         /* Fill any remaining argument positions with nulls */
    1102       287237 :         for (; i < numFinalArgs; i++)
    1103              :         {
    1104        81757 :             fcinfo->args[i].value = (Datum) 0;
    1105        81757 :             fcinfo->args[i].isnull = true;
    1106        81757 :             anynull = true;
    1107              :         }
    1108              : 
    1109       205480 :         if (fcinfo->flinfo->fn_strict && anynull)
    1110              :         {
    1111              :             /* don't call a strict function with NULL inputs */
    1112            0 :             *resultVal = (Datum) 0;
    1113            0 :             *resultIsNull = true;
    1114              :         }
    1115              :         else
    1116              :         {
    1117              :             Datum       result;
    1118              : 
    1119       205480 :             result = FunctionCallInvoke(fcinfo);
    1120       205472 :             *resultIsNull = fcinfo->isnull;
    1121       205472 :             *resultVal = MakeExpandedObjectReadOnly(result,
    1122              :                                                     fcinfo->isnull,
    1123              :                                                     peragg->resulttypeLen);
    1124              :         }
    1125       205472 :         aggstate->curperagg = NULL;
    1126              :     }
    1127              :     else
    1128              :     {
    1129       511729 :         *resultVal =
    1130       511729 :             MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1131              :                                        pergroupstate->transValueIsNull,
    1132              :                                        pertrans->transtypeLen);
    1133       511729 :         *resultIsNull = pergroupstate->transValueIsNull;
    1134              :     }
    1135              : 
    1136       717201 :     MemoryContextSwitchTo(oldContext);
    1137       717201 : }
    1138              : 
    1139              : /*
    1140              :  * Compute the output value of one partial aggregate.
    1141              :  *
    1142              :  * The serialization function will be run, and the result delivered, in the
    1143              :  * output-tuple context; caller's CurrentMemoryContext does not matter.
    1144              :  */
    1145              : static void
    1146        11631 : finalize_partialaggregate(AggState *aggstate,
    1147              :                           AggStatePerAgg peragg,
    1148              :                           AggStatePerGroup pergroupstate,
    1149              :                           Datum *resultVal, bool *resultIsNull)
    1150              : {
    1151        11631 :     AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
    1152              :     MemoryContext oldContext;
    1153              : 
    1154        11631 :     oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
    1155              : 
    1156              :     /*
    1157              :      * serialfn_oid will be set if we must serialize the transvalue before
    1158              :      * returning it
    1159              :      */
    1160        11631 :     if (OidIsValid(pertrans->serialfn_oid))
    1161              :     {
    1162              :         /* Don't call a strict serialization function with NULL input. */
    1163          364 :         if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
    1164              :         {
    1165           60 :             *resultVal = (Datum) 0;
    1166           60 :             *resultIsNull = true;
    1167              :         }
    1168              :         else
    1169              :         {
    1170          304 :             FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
    1171              :             Datum       result;
    1172              : 
    1173          304 :             fcinfo->args[0].value =
    1174          304 :                 MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1175              :                                            pergroupstate->transValueIsNull,
    1176              :                                            pertrans->transtypeLen);
    1177          304 :             fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
    1178          304 :             fcinfo->isnull = false;
    1179              : 
    1180          304 :             result = FunctionCallInvoke(fcinfo);
    1181          304 :             *resultIsNull = fcinfo->isnull;
    1182          304 :             *resultVal = MakeExpandedObjectReadOnly(result,
    1183              :                                                     fcinfo->isnull,
    1184              :                                                     peragg->resulttypeLen);
    1185              :         }
    1186              :     }
    1187              :     else
    1188              :     {
    1189        11267 :         *resultVal =
    1190        11267 :             MakeExpandedObjectReadOnly(pergroupstate->transValue,
    1191              :                                        pergroupstate->transValueIsNull,
    1192              :                                        pertrans->transtypeLen);
    1193        11267 :         *resultIsNull = pergroupstate->transValueIsNull;
    1194              :     }
    1195              : 
    1196        11631 :     MemoryContextSwitchTo(oldContext);
    1197        11631 : }
    1198              : 
    1199              : /*
    1200              :  * Extract the attributes that make up the grouping key into the
    1201              :  * hashslot. This is necessary to compute the hash or perform a lookup.
    1202              :  */
    1203              : static inline void
    1204      5489971 : prepare_hash_slot(AggStatePerHash perhash,
    1205              :                   TupleTableSlot *inputslot,
    1206              :                   TupleTableSlot *hashslot)
    1207              : {
    1208              :     int         i;
    1209              : 
    1210              :     /* transfer just the needed columns into hashslot */
    1211      5489971 :     slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
    1212      5489971 :     ExecClearTuple(hashslot);
    1213              : 
    1214     13591193 :     for (i = 0; i < perhash->numhashGrpCols; i++)
    1215              :     {
    1216      8101222 :         int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    1217              : 
    1218      8101222 :         hashslot->tts_values[i] = inputslot->tts_values[varNumber];
    1219      8101222 :         hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
    1220              :     }
    1221      5489971 :     ExecStoreVirtualTuple(hashslot);
    1222      5489971 : }
    1223              : 
    1224              : /*
    1225              :  * Prepare to finalize and project based on the specified representative tuple
    1226              :  * slot and grouping set.
    1227              :  *
    1228              :  * In the specified tuple slot, force to null all attributes that should be
    1229              :  * read as null in the context of the current grouping set.  Also stash the
    1230              :  * current group bitmap where GroupingExpr can get at it.
    1231              :  *
    1232              :  * This relies on three conditions:
    1233              :  *
    1234              :  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
    1235              :  * only reference it in evaluations, which will only access individual
    1236              :  * attributes.
    1237              :  *
    1238              :  * 2) No system columns are going to need to be nulled. (If a system column is
    1239              :  * referenced in a group clause, it is actually projected in the outer plan
    1240              :  * tlist.)
    1241              :  *
    1242              :  * 3) Within a given phase, we never need to recover the value of an attribute
    1243              :  * once it has been set to null.
    1244              :  *
    1245              :  * Poking into the slot this way is a bit ugly, but the consensus is that the
    1246              :  * alternative was worse.
    1247              :  */
    1248              : static void
    1249       528957 : prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
    1250              : {
    1251       528957 :     if (aggstate->phase->grouped_cols)
    1252              :     {
    1253       352040 :         Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
    1254              : 
    1255       352040 :         aggstate->grouped_cols = grouped_cols;
    1256              : 
    1257       352040 :         if (TTS_EMPTY(slot))
    1258              :         {
    1259              :             /*
    1260              :              * Force all values to be NULL if working on an empty input tuple
    1261              :              * (i.e. an empty grouping set for which no input rows were
    1262              :              * supplied).
    1263              :              */
    1264           40 :             ExecStoreAllNullTuple(slot);
    1265              :         }
    1266       352000 :         else if (aggstate->all_grouped_cols)
    1267              :         {
    1268              :             ListCell   *lc;
    1269              : 
    1270              :             /* all_grouped_cols is arranged in desc order */
    1271       351968 :             slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
    1272              : 
    1273       967640 :             foreach(lc, aggstate->all_grouped_cols)
    1274              :             {
    1275       615672 :                 int         attnum = lfirst_int(lc);
    1276              : 
    1277       615672 :                 if (!bms_is_member(attnum, grouped_cols))
    1278        38668 :                     slot->tts_isnull[attnum - 1] = true;
    1279              :             }
    1280              :         }
    1281              :     }
    1282       528957 : }
    1283              : 
    1284              : /*
    1285              :  * Compute the final value of all aggregates for one group.
    1286              :  *
    1287              :  * This function handles only one grouping set at a time, which the caller must
    1288              :  * have selected.  It's also the caller's responsibility to adjust the supplied
    1289              :  * pergroup parameter to point to the current set's transvalues.
    1290              :  *
    1291              :  * Results are stored in the output econtext aggvalues/aggnulls.
    1292              :  */
    1293              : static void
    1294       528957 : finalize_aggregates(AggState *aggstate,
    1295              :                     AggStatePerAgg peraggs,
    1296              :                     AggStatePerGroup pergroup)
    1297              : {
    1298       528957 :     ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    1299       528957 :     Datum      *aggvalues = econtext->ecxt_aggvalues;
    1300       528957 :     bool       *aggnulls = econtext->ecxt_aggnulls;
    1301              :     int         aggno;
    1302              : 
    1303              :     /*
    1304              :      * If there were any DISTINCT and/or ORDER BY aggregates, sort their
    1305              :      * inputs and run the transition functions.
    1306              :      */
    1307      1257609 :     for (int transno = 0; transno < aggstate->numtrans; transno++)
    1308              :     {
    1309       728652 :         AggStatePerTrans pertrans = &aggstate->pertrans[transno];
    1310              :         AggStatePerGroup pergroupstate;
    1311              : 
    1312       728652 :         pergroupstate = &pergroup[transno];
    1313              : 
    1314       728652 :         if (pertrans->aggsortrequired)
    1315              :         {
    1316              :             Assert(aggstate->aggstrategy != AGG_HASHED &&
    1317              :                    aggstate->aggstrategy != AGG_MIXED);
    1318              : 
    1319        35860 :             if (pertrans->numInputs == 1)
    1320        35804 :                 process_ordered_aggregate_single(aggstate,
    1321              :                                                  pertrans,
    1322              :                                                  pergroupstate);
    1323              :             else
    1324           56 :                 process_ordered_aggregate_multi(aggstate,
    1325              :                                                 pertrans,
    1326              :                                                 pergroupstate);
    1327              :         }
    1328       692792 :         else if (pertrans->numDistinctCols > 0 && pertrans->haslast)
    1329              :         {
    1330        12236 :             pertrans->haslast = false;
    1331              : 
    1332        12236 :             if (pertrans->numDistinctCols == 1)
    1333              :             {
    1334        12172 :                 if (!pertrans->inputtypeByVal && !pertrans->lastisnull)
    1335          174 :                     pfree(DatumGetPointer(pertrans->lastdatum));
    1336              : 
    1337        12172 :                 pertrans->lastisnull = false;
    1338        12172 :                 pertrans->lastdatum = (Datum) 0;
    1339              :             }
    1340              :             else
    1341           64 :                 ExecClearTuple(pertrans->uniqslot);
    1342              :         }
    1343              :     }
    1344              : 
    1345              :     /*
    1346              :      * Run the final functions.
    1347              :      */
    1348      1257789 :     for (aggno = 0; aggno < aggstate->numaggs; aggno++)
    1349              :     {
    1350       728840 :         AggStatePerAgg peragg = &peraggs[aggno];
    1351       728840 :         int         transno = peragg->transno;
    1352              :         AggStatePerGroup pergroupstate;
    1353              : 
    1354       728840 :         pergroupstate = &pergroup[transno];
    1355              : 
    1356       728840 :         if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
    1357        11631 :             finalize_partialaggregate(aggstate, peragg, pergroupstate,
    1358        11631 :                                       &aggvalues[aggno], &aggnulls[aggno]);
    1359              :         else
    1360       717209 :             finalize_aggregate(aggstate, peragg, pergroupstate,
    1361       717209 :                                &aggvalues[aggno], &aggnulls[aggno]);
    1362              :     }
    1363       528949 : }
    1364              : 
    1365              : /*
    1366              :  * Project the result of a group (whose aggs have already been calculated by
    1367              :  * finalize_aggregates). Returns the result slot, or NULL if no row is
    1368              :  * projected (suppressed by qual).
    1369              :  */
    1370              : static TupleTableSlot *
    1371       528949 : project_aggregates(AggState *aggstate)
    1372              : {
    1373       528949 :     ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
    1374              : 
    1375              :     /*
    1376              :      * Check the qual (HAVING clause); if the group does not match, ignore it.
    1377              :      */
    1378       528949 :     if (ExecQual(aggstate->ss.ps.qual, econtext))
    1379              :     {
    1380              :         /*
    1381              :          * Form and return projection tuple using the aggregate results and
    1382              :          * the representative input tuple.
    1383              :          */
    1384       458017 :         return ExecProject(aggstate->ss.ps.ps_ProjInfo);
    1385              :     }
    1386              :     else
    1387        70932 :         InstrCountFiltered1(aggstate, 1);
    1388              : 
    1389        70932 :     return NULL;
    1390              : }
    1391              : 
    1392              : /*
    1393              :  * Find input-tuple columns that are needed, dividing them into
    1394              :  * aggregated and unaggregated sets.
    1395              :  */
    1396              : static void
    1397         4563 : find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
    1398              : {
    1399         4563 :     Agg        *agg = (Agg *) aggstate->ss.ps.plan;
    1400              :     FindColsContext context;
    1401              : 
    1402         4563 :     context.is_aggref = false;
    1403         4563 :     context.aggregated = NULL;
    1404         4563 :     context.unaggregated = NULL;
    1405              : 
    1406              :     /* Examine tlist and quals */
    1407         4563 :     (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
    1408         4563 :     (void) find_cols_walker((Node *) agg->plan.qual, &context);
    1409              : 
    1410              :     /* In some cases, grouping columns will not appear in the tlist */
    1411        11471 :     for (int i = 0; i < agg->numCols; i++)
    1412         6908 :         context.unaggregated = bms_add_member(context.unaggregated,
    1413         6908 :                                               agg->grpColIdx[i]);
    1414              : 
    1415         4563 :     *aggregated = context.aggregated;
    1416         4563 :     *unaggregated = context.unaggregated;
    1417         4563 : }
    1418              : 
    1419              : static bool
    1420        54621 : find_cols_walker(Node *node, FindColsContext *context)
    1421              : {
    1422        54621 :     if (node == NULL)
    1423         9933 :         return false;
    1424        44688 :     if (IsA(node, Var))
    1425              :     {
    1426        12170 :         Var        *var = (Var *) node;
    1427              : 
    1428              :         /* setrefs.c should have set the varno to OUTER_VAR */
    1429              :         Assert(var->varno == OUTER_VAR);
    1430              :         Assert(var->varlevelsup == 0);
    1431        12170 :         if (context->is_aggref)
    1432         4022 :             context->aggregated = bms_add_member(context->aggregated,
    1433         4022 :                                                  var->varattno);
    1434              :         else
    1435         8148 :             context->unaggregated = bms_add_member(context->unaggregated,
    1436         8148 :                                                    var->varattno);
    1437        12170 :         return false;
    1438              :     }
    1439        32518 :     if (IsA(node, Aggref))
    1440              :     {
    1441              :         Assert(!context->is_aggref);
    1442         5684 :         context->is_aggref = true;
    1443         5684 :         expression_tree_walker(node, find_cols_walker, context);
    1444         5684 :         context->is_aggref = false;
    1445         5684 :         return false;
    1446              :     }
    1447        26834 :     return expression_tree_walker(node, find_cols_walker, context);
    1448              : }
    1449              : 
    1450              : /*
    1451              :  * (Re-)initialize the hash table(s) to empty.
    1452              :  *
    1453              :  * To implement hashed aggregation, we need a hashtable that stores a
    1454              :  * representative tuple and an array of AggStatePerGroup structs for each
    1455              :  * distinct set of GROUP BY column values.  We compute the hash key from the
    1456              :  * GROUP BY columns.  The per-group data is allocated in initialize_hash_entry(),
    1457              :  * for each entry.
    1458              :  *
    1459              :  * We have a separate hashtable and associated perhash data structure for each
    1460              :  * grouping set for which we're doing hashing.
    1461              :  *
    1462              :  * The contents of the hash tables live in the aggstate's hash_tuplescxt
    1463              :  * memory context (there is only one of these for all tables together, since
    1464              :  * they are all reset at the same time).
    1465              :  */
    1466              : static void
    1467        10001 : build_hash_tables(AggState *aggstate)
    1468              : {
    1469              :     int         setno;
    1470              : 
    1471        20237 :     for (setno = 0; setno < aggstate->num_hashes; ++setno)
    1472              :     {
    1473        10236 :         AggStatePerHash perhash = &aggstate->perhash[setno];
    1474              :         double      nbuckets;
    1475              :         Size        memory;
    1476              : 
    1477        10236 :         if (perhash->hashtable != NULL)
    1478              :         {
    1479         6728 :             ResetTupleHashTable(perhash->hashtable);
    1480         6728 :             continue;
    1481              :         }
    1482              : 
    1483         3508 :         memory = aggstate->hash_mem_limit / aggstate->num_hashes;
    1484              : 
    1485              :         /* choose reasonable number of buckets per hashtable */
    1486         3508 :         nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
    1487         3508 :                                            perhash->aggnode->numGroups,
    1488              :                                            memory);
    1489              : 
    1490              : #ifdef USE_INJECTION_POINTS
    1491         3508 :         if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table"))
    1492              :         {
    1493            0 :             nbuckets = memory / TupleHashEntrySize();
    1494            0 :             INJECTION_POINT_CACHED("hash-aggregate-oversize-table", NULL);
    1495              :         }
    1496              : #endif
    1497              : 
    1498         3508 :         build_hash_table(aggstate, setno, nbuckets);
    1499              :     }
    1500              : 
    1501        10001 :     aggstate->hash_ngroups_current = 0;
    1502        10001 : }
    1503              : 
    1504              : /*
    1505              :  * Build a single hashtable for this grouping set.
    1506              :  */
    1507              : static void
    1508         3508 : build_hash_table(AggState *aggstate, int setno, double nbuckets)
    1509              : {
    1510         3508 :     AggStatePerHash perhash = &aggstate->perhash[setno];
    1511         3508 :     MemoryContext metacxt = aggstate->hash_metacxt;
    1512         3508 :     MemoryContext tuplescxt = aggstate->hash_tuplescxt;
    1513         3508 :     MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
    1514              :     Size        additionalsize;
    1515              : 
    1516              :     Assert(aggstate->aggstrategy == AGG_HASHED ||
    1517              :            aggstate->aggstrategy == AGG_MIXED);
    1518              : 
    1519              :     /*
    1520              :      * Used to make sure initial hash table allocation does not exceed
    1521              :      * hash_mem. Note that the estimate does not include space for
    1522              :      * pass-by-reference transition data values, nor for the representative
    1523              :      * tuple of each group.
    1524              :      */
    1525         3508 :     additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
    1526              : 
    1527         7016 :     perhash->hashtable = BuildTupleHashTable(&aggstate->ss.ps,
    1528         3508 :                                              perhash->hashslot->tts_tupleDescriptor,
    1529         3508 :                                              perhash->hashslot->tts_ops,
    1530              :                                              perhash->numCols,
    1531              :                                              perhash->hashGrpColIdxHash,
    1532         3508 :                                              perhash->eqfuncoids,
    1533              :                                              perhash->hashfunctions,
    1534         3508 :                                              perhash->aggnode->grpCollations,
    1535              :                                              nbuckets,
    1536              :                                              additionalsize,
    1537              :                                              metacxt,
    1538              :                                              tuplescxt,
    1539              :                                              tmpcxt,
    1540         3508 :                                              DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
    1541         3508 : }
    1542              : 
    1543              : /*
    1544              :  * Compute columns that actually need to be stored in hashtable entries.  The
    1545              :  * incoming tuples from the child plan node will contain grouping columns,
    1546              :  * other columns referenced in our targetlist and qual, columns used to
    1547              :  * compute the aggregate functions, and perhaps just junk columns we don't use
    1548              :  * at all.  Only columns of the first two types need to be stored in the
    1549              :  * hashtable, and getting rid of the others can make the table entries
    1550              :  * significantly smaller.  The hashtable only contains the relevant columns,
    1551              :  * and is packed/unpacked in lookup_hash_entries() / agg_retrieve_hash_table()
    1552              :  * into the format of the normal input descriptor.
    1553              :  *
    1554              :  * Additional columns, in addition to the columns grouped by, come from two
    1555              :  * sources: Firstly functionally dependent columns that we don't need to group
    1556              :  * by themselves, and secondly ctids for row-marks.
    1557              :  *
    1558              :  * To eliminate duplicates, we build a bitmapset of the needed columns, and
    1559              :  * then build an array of the columns included in the hashtable. We might
    1560              :  * still have duplicates if the passed-in grpColIdx has them, which can happen
    1561              :  * in edge cases from semijoins/distinct; these can't always be removed,
    1562              :  * because it's not certain that the duplicate cols will be using the same
    1563              :  * hash function.
    1564              :  *
    1565              :  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
    1566              :  * the per-query context (unlike the hash table itself).
    1567              :  */
    1568              : static void
    1569         4563 : find_hash_columns(AggState *aggstate)
    1570              : {
    1571              :     Bitmapset  *base_colnos;
    1572              :     Bitmapset  *aggregated_colnos;
    1573         4563 :     TupleDesc   scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
    1574         4563 :     List       *outerTlist = outerPlanState(aggstate)->plan->targetlist;
    1575         4563 :     int         numHashes = aggstate->num_hashes;
    1576         4563 :     EState     *estate = aggstate->ss.ps.state;
    1577              :     int         j;
    1578              : 
    1579              :     /* Find Vars that will be needed in tlist and qual */
    1580         4563 :     find_cols(aggstate, &aggregated_colnos, &base_colnos);
    1581         4563 :     aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
    1582         4563 :     aggstate->max_colno_needed = 0;
    1583         4563 :     aggstate->all_cols_needed = true;
    1584              : 
    1585        19492 :     for (int i = 0; i < scanDesc->natts; i++)
    1586              :     {
    1587        14929 :         int         colno = i + 1;
    1588              : 
    1589        14929 :         if (bms_is_member(colno, aggstate->colnos_needed))
    1590        10402 :             aggstate->max_colno_needed = colno;
    1591              :         else
    1592         4527 :             aggstate->all_cols_needed = false;
    1593              :     }
    1594              : 
    1595         9486 :     for (j = 0; j < numHashes; ++j)
    1596              :     {
    1597         4923 :         AggStatePerHash perhash = &aggstate->perhash[j];
    1598         4923 :         Bitmapset  *colnos = bms_copy(base_colnos);
    1599         4923 :         AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
    1600         4923 :         List       *hashTlist = NIL;
    1601              :         TupleDesc   hashDesc;
    1602              :         int         maxCols;
    1603              :         int         i;
    1604              : 
    1605         4923 :         perhash->largestGrpColIdx = 0;
    1606              : 
    1607              :         /*
    1608              :          * If we're doing grouping sets, then some Vars might be referenced in
    1609              :          * tlist/qual for the benefit of other grouping sets, but not needed
    1610              :          * when hashing; i.e. prepare_projection_slot will null them out, so
    1611              :          * there'd be no point storing them.  Use prepare_projection_slot's
    1612              :          * logic to determine which.
    1613              :          */
    1614         4923 :         if (aggstate->phases[0].grouped_cols)
    1615              :         {
    1616         4923 :             Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
    1617              :             ListCell   *lc;
    1618              : 
    1619        13250 :             foreach(lc, aggstate->all_grouped_cols)
    1620              :             {
    1621         8327 :                 int         attnum = lfirst_int(lc);
    1622              : 
    1623         8327 :                 if (!bms_is_member(attnum, grouped_cols))
    1624          988 :                     colnos = bms_del_member(colnos, attnum);
    1625              :             }
    1626              :         }
    1627              : 
    1628              :         /*
    1629              :          * Compute maximum number of input columns accounting for possible
    1630              :          * duplications in the grpColIdx array, which can happen in some edge
    1631              :          * cases where HashAggregate was generated as part of a semijoin or a
    1632              :          * DISTINCT.
    1633              :          */
    1634         4923 :         maxCols = bms_num_members(colnos) + perhash->numCols;
    1635              : 
    1636         4923 :         perhash->hashGrpColIdxInput =
    1637         4923 :             palloc(maxCols * sizeof(AttrNumber));
    1638         4923 :         perhash->hashGrpColIdxHash =
    1639         4923 :             palloc(perhash->numCols * sizeof(AttrNumber));
    1640              : 
    1641              :         /* Add all the grouping columns to colnos */
    1642        12262 :         for (i = 0; i < perhash->numCols; i++)
    1643         7339 :             colnos = bms_add_member(colnos, grpColIdx[i]);
    1644              : 
    1645              :         /*
    1646              :          * First build mapping for columns directly hashed. These are the
    1647              :          * first, because they'll be accessed when computing hash values and
    1648              :          * comparing tuples for exact matches. We also build simple mapping
    1649              :          * for execGrouping, so it knows where to find the to-be-hashed /
    1650              :          * compared columns in the input.
    1651              :          */
    1652        12262 :         for (i = 0; i < perhash->numCols; i++)
    1653              :         {
    1654         7339 :             perhash->hashGrpColIdxInput[i] = grpColIdx[i];
    1655         7339 :             perhash->hashGrpColIdxHash[i] = i + 1;
    1656         7339 :             perhash->numhashGrpCols++;
    1657              :             /* delete already mapped columns */
    1658         7339 :             colnos = bms_del_member(colnos, grpColIdx[i]);
    1659              :         }
    1660              : 
    1661              :         /* and add the remaining columns */
    1662         4923 :         i = -1;
    1663         5671 :         while ((i = bms_next_member(colnos, i)) >= 0)
    1664              :         {
    1665          748 :             perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
    1666          748 :             perhash->numhashGrpCols++;
    1667              :         }
    1668              : 
    1669              :         /* and build a tuple descriptor for the hashtable */
    1670        13010 :         for (i = 0; i < perhash->numhashGrpCols; i++)
    1671              :         {
    1672         8087 :             int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    1673              : 
    1674         8087 :             hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
    1675         8087 :             perhash->largestGrpColIdx =
    1676         8087 :                 Max(varNumber + 1, perhash->largestGrpColIdx);
    1677              :         }
    1678              : 
    1679         4923 :         hashDesc = ExecTypeFromTL(hashTlist);
    1680              : 
    1681         4923 :         execTuplesHashPrepare(perhash->numCols,
    1682         4923 :                               perhash->aggnode->grpOperators,
    1683              :                               &perhash->eqfuncoids,
    1684              :                               &perhash->hashfunctions);
    1685         4923 :         perhash->hashslot =
    1686         4923 :             ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
    1687              :                                &TTSOpsMinimalTuple, 0);
    1688              : 
    1689         4923 :         list_free(hashTlist);
    1690         4923 :         bms_free(colnos);
    1691              :     }
    1692              : 
    1693         4563 :     bms_free(base_colnos);
    1694         4563 : }
    1695              : 
    1696              : /*
    1697              :  * Estimate per-hash-table-entry overhead.
    1698              :  */
    1699              : Size
    1700        33197 : hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
    1701              : {
    1702              :     Size        tupleChunkSize;
    1703              :     Size        pergroupChunkSize;
    1704              :     Size        transitionChunkSize;
    1705        33197 :     Size        tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
    1706              :                              tupleWidth);
    1707        33197 :     Size        pergroupSize = numTrans * sizeof(AggStatePerGroupData);
    1708              : 
    1709              :     /*
    1710              :      * Entries use the Bump allocator, so the chunk sizes are the same as the
    1711              :      * requested sizes.
    1712              :      */
    1713        33197 :     tupleChunkSize = MAXALIGN(tupleSize);
    1714        33197 :     pergroupChunkSize = pergroupSize;
    1715              : 
    1716              :     /*
    1717              :      * Transition values use AllocSet, which has a chunk header and also uses
    1718              :      * power-of-two allocations.
    1719              :      */
    1720        33197 :     if (transitionSpace > 0)
    1721         4170 :         transitionChunkSize = CHUNKHDRSZ + pg_nextpower2_size_t(transitionSpace);
    1722              :     else
    1723        29027 :         transitionChunkSize = 0;
    1724              : 
    1725              :     return
    1726        33197 :         TupleHashEntrySize() +
    1727        33197 :         tupleChunkSize +
    1728        33197 :         pergroupChunkSize +
    1729              :         transitionChunkSize;
    1730              : }
    1731              : 
    1732              : /*
    1733              :  * hashagg_recompile_expressions()
    1734              :  *
    1735              :  * Identifies the right phase, compiles the right expression given the
    1736              :  * arguments, and then sets phase->evalfunc to that expression.
    1737              :  *
    1738              :  * Different versions of the compiled expression are needed depending on
    1739              :  * whether hash aggregation has spilled or not, and whether it's reading from
    1740              :  * the outer plan or a tape. Before spilling to disk, the expression reads
    1741              :  * from the outer plan and does not need to perform a NULL check. After
    1742              :  * HashAgg begins to spill, new groups will not be created in the hash table,
    1743              :  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
    1744              :  * pointer check to the expression. Then, when reading spilled data from a
    1745              :  * tape, we change the outer slot type to be a fixed minimal tuple slot.
    1746              :  *
    1747              :  * It would be wasteful to recompile every time, so cache the compiled
    1748              :  * expressions in the AggStatePerPhase, and reuse when appropriate.
    1749              :  */
    1750              : static void
    1751        42281 : hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
    1752              : {
    1753              :     AggStatePerPhase phase;
    1754        42281 :     int         i = minslot ? 1 : 0;
    1755        42281 :     int         j = nullcheck ? 1 : 0;
    1756              : 
    1757              :     Assert(aggstate->aggstrategy == AGG_HASHED ||
    1758              :            aggstate->aggstrategy == AGG_MIXED);
    1759              : 
    1760        42281 :     if (aggstate->aggstrategy == AGG_HASHED)
    1761         7233 :         phase = &aggstate->phases[0];
    1762              :     else                        /* AGG_MIXED */
    1763        35048 :         phase = &aggstate->phases[1];
    1764              : 
    1765        42281 :     if (phase->evaltrans_cache[i][j] == NULL)
    1766              :     {
    1767           56 :         const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
    1768           56 :         bool        outerfixed = aggstate->ss.ps.outeropsfixed;
    1769           56 :         bool        dohash = true;
    1770           56 :         bool        dosort = false;
    1771              : 
    1772              :         /*
    1773              :          * If minslot is true, that means we are processing a spilled batch
    1774              :          * (inside agg_refill_hash_table()), and we must not advance the
    1775              :          * sorted grouping sets.
    1776              :          */
    1777           56 :         if (aggstate->aggstrategy == AGG_MIXED && !minslot)
    1778            8 :             dosort = true;
    1779              : 
    1780              :         /* temporarily change the outerops while compiling the expression */
    1781           56 :         if (minslot)
    1782              :         {
    1783           28 :             aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
    1784           28 :             aggstate->ss.ps.outeropsfixed = true;
    1785              :         }
    1786              : 
    1787           56 :         phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
    1788              :                                                          dosort, dohash,
    1789              :                                                          nullcheck);
    1790              : 
    1791              :         /* change back */
    1792           56 :         aggstate->ss.ps.outerops = outerops;
    1793           56 :         aggstate->ss.ps.outeropsfixed = outerfixed;
    1794              :     }
    1795              : 
    1796        42281 :     phase->evaltrans = phase->evaltrans_cache[i][j];
    1797        42281 : }
    1798              : 
    1799              : /*
    1800              :  * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
    1801              :  * number of partitions we expect to create (if we do spill).
    1802              :  *
    1803              :  * There are two limits: a memory limit, and also an ngroups limit. The
    1804              :  * ngroups limit becomes important when we expect transition values to grow
    1805              :  * substantially larger than the initial value.
    1806              :  */
    1807              : void
    1808        48960 : hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
    1809              :                     Size *mem_limit, uint64 *ngroups_limit,
    1810              :                     int *num_partitions)
    1811              : {
    1812              :     int         npartitions;
    1813              :     Size        partition_mem;
    1814        48960 :     Size        hash_mem_limit = get_hash_memory_limit();
    1815              : 
    1816              :     /* if not expected to spill, use all of hash_mem */
    1817        48960 :     if (input_groups * hashentrysize <= hash_mem_limit)
    1818              :     {
    1819        47329 :         if (num_partitions != NULL)
    1820        30928 :             *num_partitions = 0;
    1821        47329 :         *mem_limit = hash_mem_limit;
    1822        47329 :         *ngroups_limit = hash_mem_limit / hashentrysize;
    1823        47329 :         return;
    1824              :     }
    1825              : 
    1826              :     /*
    1827              :      * Calculate expected memory requirements for spilling, which is the size
    1828              :      * of the buffers needed for all the tapes that need to be open at once.
    1829              :      * Then, subtract that from the memory available for holding hash tables.
    1830              :      */
    1831         1631 :     npartitions = hash_choose_num_partitions(input_groups,
    1832              :                                              hashentrysize,
    1833              :                                              used_bits,
    1834              :                                              NULL);
    1835         1631 :     if (num_partitions != NULL)
    1836           75 :         *num_partitions = npartitions;
    1837              : 
    1838         1631 :     partition_mem =
    1839         1631 :         HASHAGG_READ_BUFFER_SIZE +
    1840              :         HASHAGG_WRITE_BUFFER_SIZE * npartitions;
    1841              : 
    1842              :     /*
    1843              :      * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
    1844              :      * minimum number of partitions, so we aren't going to dramatically exceed
    1845              :      * work mem anyway.
    1846              :      */
    1847         1631 :     if (hash_mem_limit > 4 * partition_mem)
    1848            0 :         *mem_limit = hash_mem_limit - partition_mem;
    1849              :     else
    1850         1631 :         *mem_limit = hash_mem_limit * 0.75;
    1851              : 
    1852         1631 :     if (*mem_limit > hashentrysize)
    1853         1631 :         *ngroups_limit = *mem_limit / hashentrysize;
    1854              :     else
    1855            0 :         *ngroups_limit = 1;
    1856              : }
    1857              : 
    1858              : /*
    1859              :  * hash_agg_check_limits
    1860              :  *
    1861              :  * After adding a new group to the hash table, check whether we need to enter
    1862              :  * spill mode. Allocations may happen without adding new groups (for instance,
    1863              :  * if the transition state size grows), so this check is imperfect.
    1864              :  */
    1865              : static void
    1866       330997 : hash_agg_check_limits(AggState *aggstate)
    1867              : {
    1868       330997 :     uint64      ngroups = aggstate->hash_ngroups_current;
    1869       330997 :     Size        meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
    1870              :                                                      true);
    1871       330997 :     Size        entry_mem = MemoryContextMemAllocated(aggstate->hash_tuplescxt,
    1872              :                                                       true);
    1873       330997 :     Size        tval_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
    1874              :                                                      true);
    1875       330997 :     Size        total_mem = meta_mem + entry_mem + tval_mem;
    1876       330997 :     bool        do_spill = false;
    1877              : 
    1878              : #ifdef USE_INJECTION_POINTS
    1879       330997 :     if (ngroups >= 1000)
    1880              :     {
    1881        58757 :         if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000"))
    1882              :         {
    1883            5 :             do_spill = true;
    1884            5 :             INJECTION_POINT_CACHED("hash-aggregate-spill-1000", NULL);
    1885              :         }
    1886              :     }
    1887              : #endif
    1888              : 
    1889              :     /*
    1890              :      * Don't spill unless there's at least one group in the hash table so we
    1891              :      * can be sure to make progress even in edge cases.
    1892              :      */
    1893       330997 :     if (aggstate->hash_ngroups_current > 0 &&
    1894       330997 :         (total_mem > aggstate->hash_mem_limit ||
    1895       313389 :          ngroups > aggstate->hash_ngroups_limit))
    1896              :     {
    1897        17631 :         do_spill = true;
    1898              :     }
    1899              : 
    1900       330997 :     if (do_spill)
    1901        17636 :         hash_agg_enter_spill_mode(aggstate);
    1902       330997 : }
    1903              : 
    1904              : /*
    1905              :  * Enter "spill mode", meaning that no new groups are added to any of the hash
    1906              :  * tables. Tuples that would create a new group are instead spilled, and
    1907              :  * processed later.
    1908              :  */
    1909              : static void
    1910        17636 : hash_agg_enter_spill_mode(AggState *aggstate)
    1911              : {
    1912        17636 :     INJECTION_POINT("hash-aggregate-enter-spill-mode", NULL);
    1913        17636 :     aggstate->hash_spill_mode = true;
    1914        17636 :     hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
    1915              : 
    1916        17636 :     if (!aggstate->hash_ever_spilled)
    1917              :     {
    1918              :         Assert(aggstate->hash_tapeset == NULL);
    1919              :         Assert(aggstate->hash_spills == NULL);
    1920              : 
    1921           40 :         aggstate->hash_ever_spilled = true;
    1922              : 
    1923           40 :         aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
    1924              : 
    1925           40 :         aggstate->hash_spills = palloc_array(HashAggSpill, aggstate->num_hashes);
    1926              : 
    1927          120 :         for (int setno = 0; setno < aggstate->num_hashes; setno++)
    1928              :         {
    1929           80 :             AggStatePerHash perhash = &aggstate->perhash[setno];
    1930           80 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    1931              : 
    1932           80 :             hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
    1933           80 :                                perhash->aggnode->numGroups,
    1934              :                                aggstate->hashentrysize);
    1935              :         }
    1936              :     }
    1937        17636 : }
    1938              : 
    1939              : /*
    1940              :  * Update metrics after filling the hash table.
    1941              :  *
    1942              :  * If reading from the outer plan, from_tape should be false; if reading from
    1943              :  * another tape, from_tape should be true.
    1944              :  */
    1945              : static void
    1946        27790 : hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
    1947              : {
    1948              :     Size        meta_mem;
    1949              :     Size        entry_mem;
    1950              :     Size        hashkey_mem;
    1951              :     Size        buffer_mem;
    1952              :     Size        total_mem;
    1953              : 
    1954        27790 :     if (aggstate->aggstrategy != AGG_MIXED &&
    1955        10178 :         aggstate->aggstrategy != AGG_HASHED)
    1956            0 :         return;
    1957              : 
    1958              :     /* memory for the hash table itself */
    1959        27790 :     meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
    1960              : 
    1961              :     /* memory for hash entries */
    1962        27790 :     entry_mem = MemoryContextMemAllocated(aggstate->hash_tuplescxt, true);
    1963              : 
    1964              :     /* memory for byref transition states */
    1965        27790 :     hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
    1966              : 
    1967              :     /* memory for read/write tape buffers, if spilled */
    1968        27790 :     buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
    1969        27790 :     if (from_tape)
    1970        17957 :         buffer_mem += HASHAGG_READ_BUFFER_SIZE;
    1971              : 
    1972              :     /* update peak mem */
    1973        27790 :     total_mem = meta_mem + entry_mem + hashkey_mem + buffer_mem;
    1974        27790 :     if (total_mem > aggstate->hash_mem_peak)
    1975         3177 :         aggstate->hash_mem_peak = total_mem;
    1976              : 
    1977              :     /* update disk usage */
    1978        27790 :     if (aggstate->hash_tapeset != NULL)
    1979              :     {
    1980        17997 :         uint64      disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
    1981              : 
    1982        17997 :         if (aggstate->hash_disk_used < disk_used)
    1983           33 :             aggstate->hash_disk_used = disk_used;
    1984              :     }
    1985              : 
    1986              :     /* update hashentrysize estimate based on contents */
    1987        27790 :     if (aggstate->hash_ngroups_current > 0)
    1988              :     {
    1989        27446 :         aggstate->hashentrysize =
    1990        27446 :             TupleHashEntrySize() +
    1991        27446 :             (hashkey_mem / (double) aggstate->hash_ngroups_current);
    1992              :     }
    1993              : }
    1994              : 
    1995              : /*
    1996              :  * Create memory contexts used for hash aggregation.
    1997              :  */
    1998              : static void
    1999         4563 : hash_create_memory(AggState *aggstate)
    2000              : {
    2001         4563 :     Size        maxBlockSize = ALLOCSET_DEFAULT_MAXSIZE;
    2002              : 
    2003              :     /*
    2004              :      * The hashcontext's per-tuple memory will be used for byref transition
    2005              :      * values and returned by AggCheckCallContext().
    2006              :      */
    2007         4563 :     aggstate->hashcontext = CreateWorkExprContext(aggstate->ss.ps.state);
    2008              : 
    2009              :     /*
    2010              :      * The meta context will be used for the bucket array of
    2011              :      * TupleHashEntryData (or arrays, in the case of grouping sets). As the
    2012              :      * hash table grows, the bucket array will double in size and the old one
    2013              :      * will be freed, so an AllocSet is appropriate. For large bucket arrays,
    2014              :      * the large allocation path will be used, so it's not worth worrying
    2015              :      * about wasting space due to power-of-two allocations.
    2016              :      */
    2017         4563 :     aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
    2018              :                                                    "HashAgg meta context",
    2019              :                                                    ALLOCSET_DEFAULT_SIZES);
    2020              : 
    2021              :     /*
    2022              :      * The hash entries themselves, which include the grouping key
    2023              :      * (firstTuple) and pergroup data, are stored in the table context. The
    2024              :      * bump allocator can be used because the entries are not freed until the
    2025              :      * entire hash table is reset. The bump allocator is faster for
    2026              :      * allocations and avoids wasting space on the chunk header or
    2027              :      * power-of-two allocations.
    2028              :      *
    2029              :      * Like CreateWorkExprContext(), use smaller sizings for smaller work_mem,
    2030              :      * to avoid large jumps in memory usage.
    2031              :      */
    2032              : 
    2033              :     /*
    2034              :      * Like CreateWorkExprContext(), use smaller sizings for smaller work_mem,
    2035              :      * to avoid large jumps in memory usage.
    2036              :      */
    2037         4563 :     maxBlockSize = pg_prevpower2_size_t(work_mem * (Size) 1024 / 16);
    2038              : 
    2039              :     /* But no bigger than ALLOCSET_DEFAULT_MAXSIZE */
    2040         4563 :     maxBlockSize = Min(maxBlockSize, ALLOCSET_DEFAULT_MAXSIZE);
    2041              : 
    2042              :     /* and no smaller than ALLOCSET_DEFAULT_INITSIZE */
    2043         4563 :     maxBlockSize = Max(maxBlockSize, ALLOCSET_DEFAULT_INITSIZE);
    2044              : 
    2045         4563 :     aggstate->hash_tuplescxt = BumpContextCreate(aggstate->ss.ps.state->es_query_cxt,
    2046              :                                                  "HashAgg hashed tuples",
    2047              :                                                  ALLOCSET_DEFAULT_MINSIZE,
    2048              :                                                  ALLOCSET_DEFAULT_INITSIZE,
    2049              :                                                  maxBlockSize);
    2050              : 
    2051         4563 : }
    2052              : 
    2053              : /*
    2054              :  * Choose a reasonable number of buckets for the initial hash table size.
    2055              :  */
    2056              : static double
    2057         3508 : hash_choose_num_buckets(double hashentrysize, double ngroups, Size memory)
    2058              : {
    2059              :     double      max_nbuckets;
    2060         3508 :     double      nbuckets = ngroups;
    2061              : 
    2062         3508 :     max_nbuckets = memory / hashentrysize;
    2063              : 
    2064              :     /*
    2065              :      * Underestimating is better than overestimating. Too many buckets crowd
    2066              :      * out space for group keys and transition state values.
    2067              :      */
    2068         3508 :     max_nbuckets /= 2;
    2069              : 
    2070         3508 :     if (nbuckets > max_nbuckets)
    2071           48 :         nbuckets = max_nbuckets;
    2072              : 
    2073              :     /*
    2074              :      * BuildTupleHashTable will clamp any obviously-insane result, so we don't
    2075              :      * need to be too careful here.
    2076              :      */
    2077         3508 :     return nbuckets;
    2078              : }
    2079              : 
    2080              : /*
    2081              :  * Determine the number of partitions to create when spilling, which will
    2082              :  * always be a power of two. If log2_npartitions is non-NULL, set
    2083              :  * *log2_npartitions to the log2() of the number of partitions.
    2084              :  */
    2085              : static int
    2086        10043 : hash_choose_num_partitions(double input_groups, double hashentrysize,
    2087              :                            int used_bits, int *log2_npartitions)
    2088              : {
    2089        10043 :     Size        hash_mem_limit = get_hash_memory_limit();
    2090              :     double      partition_limit;
    2091              :     double      mem_wanted;
    2092              :     double      dpartitions;
    2093              :     int         npartitions;
    2094              :     int         partition_bits;
    2095              : 
    2096              :     /*
    2097              :      * Avoid creating so many partitions that the memory requirements of the
    2098              :      * open partition files are greater than 1/4 of hash_mem.
    2099              :      */
    2100        10043 :     partition_limit =
    2101        10043 :         (hash_mem_limit * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
    2102              :         HASHAGG_WRITE_BUFFER_SIZE;
    2103              : 
    2104        10043 :     mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
    2105              : 
    2106              :     /* make enough partitions so that each one is likely to fit in memory */
    2107        10043 :     dpartitions = 1 + (mem_wanted / hash_mem_limit);
    2108              : 
    2109        10043 :     if (dpartitions > partition_limit)
    2110        10001 :         dpartitions = partition_limit;
    2111              : 
    2112        10043 :     if (dpartitions < HASHAGG_MIN_PARTITIONS)
    2113        10043 :         dpartitions = HASHAGG_MIN_PARTITIONS;
    2114        10043 :     if (dpartitions > HASHAGG_MAX_PARTITIONS)
    2115            0 :         dpartitions = HASHAGG_MAX_PARTITIONS;
    2116              : 
    2117              :     /* HASHAGG_MAX_PARTITIONS limit makes this safe */
    2118        10043 :     npartitions = (int) dpartitions;
    2119              : 
    2120              :     /* ceil(log2(npartitions)) */
    2121        10043 :     partition_bits = pg_ceil_log2_32(npartitions);
    2122              : 
    2123              :     /* make sure that we don't exhaust the hash bits */
    2124        10043 :     if (partition_bits + used_bits >= 32)
    2125            0 :         partition_bits = 32 - used_bits;
    2126              : 
    2127        10043 :     if (log2_npartitions != NULL)
    2128         8412 :         *log2_npartitions = partition_bits;
    2129              : 
    2130              :     /* number of partitions will be a power of two */
    2131        10043 :     npartitions = 1 << partition_bits;
    2132              : 
    2133        10043 :     return npartitions;
    2134              : }
    2135              : 
    2136              : /*
    2137              :  * Initialize a freshly-created TupleHashEntry.
    2138              :  */
    2139              : static void
    2140       330997 : initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable,
    2141              :                       TupleHashEntry entry)
    2142              : {
    2143              :     AggStatePerGroup pergroup;
    2144              :     int         transno;
    2145              : 
    2146       330997 :     aggstate->hash_ngroups_current++;
    2147       330997 :     hash_agg_check_limits(aggstate);
    2148              : 
    2149              :     /* no need to allocate or initialize per-group state */
    2150       330997 :     if (aggstate->numtrans == 0)
    2151       131111 :         return;
    2152              : 
    2153       199886 :     pergroup = (AggStatePerGroup) TupleHashEntryGetAdditional(hashtable, entry);
    2154              : 
    2155              :     /*
    2156              :      * Initialize aggregates for new tuple group, lookup_hash_entries()
    2157              :      * already has selected the relevant grouping set.
    2158              :      */
    2159       499323 :     for (transno = 0; transno < aggstate->numtrans; transno++)
    2160              :     {
    2161       299437 :         AggStatePerTrans pertrans = &aggstate->pertrans[transno];
    2162       299437 :         AggStatePerGroup pergroupstate = &pergroup[transno];
    2163              : 
    2164       299437 :         initialize_aggregate(aggstate, pertrans, pergroupstate);
    2165              :     }
    2166              : }
    2167              : 
    2168              : /*
    2169              :  * Look up hash entries for the current tuple in all hashed grouping sets.
    2170              :  *
    2171              :  * Some entries may be left NULL if we are in "spill mode". The same tuple
    2172              :  * will belong to different groups for each grouping set, so may match a group
    2173              :  * already in memory for one set and match a group not in memory for another
    2174              :  * set. When in "spill mode", the tuple will be spilled for each grouping set
    2175              :  * where it doesn't match a group in memory.
    2176              :  *
    2177              :  * NB: It's possible to spill the same tuple for several different grouping
    2178              :  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
    2179              :  * the tuple multiple times for multiple grouping sets, it can be partitioned
    2180              :  * for each grouping set, making the refilling of the hash table very
    2181              :  * efficient.
    2182              :  */
    2183              : static void
    2184      4682571 : lookup_hash_entries(AggState *aggstate)
    2185              : {
    2186      4682571 :     AggStatePerGroup *pergroup = aggstate->hash_pergroup;
    2187      4682571 :     TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
    2188              :     int         setno;
    2189              : 
    2190      9454688 :     for (setno = 0; setno < aggstate->num_hashes; setno++)
    2191              :     {
    2192      4772117 :         AggStatePerHash perhash = &aggstate->perhash[setno];
    2193      4772117 :         TupleHashTable hashtable = perhash->hashtable;
    2194      4772117 :         TupleTableSlot *hashslot = perhash->hashslot;
    2195              :         TupleHashEntry entry;
    2196              :         uint32      hash;
    2197      4772117 :         bool        isnew = false;
    2198              :         bool       *p_isnew;
    2199              : 
    2200              :         /* if hash table already spilled, don't create new entries */
    2201      4772117 :         p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
    2202              : 
    2203      4772117 :         select_current_set(aggstate, setno, true);
    2204      4772117 :         prepare_hash_slot(perhash,
    2205              :                           outerslot,
    2206              :                           hashslot);
    2207              : 
    2208      4772117 :         entry = LookupTupleHashEntry(hashtable, hashslot,
    2209              :                                      p_isnew, &hash);
    2210              : 
    2211      4772117 :         if (entry != NULL)
    2212              :         {
    2213      4351823 :             if (isnew)
    2214       237560 :                 initialize_hash_entry(aggstate, hashtable, entry);
    2215      4351823 :             pergroup[setno] = TupleHashEntryGetAdditional(hashtable, entry);
    2216              :         }
    2217              :         else
    2218              :         {
    2219       420294 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    2220       420294 :             TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
    2221              : 
    2222       420294 :             if (spill->partitions == NULL)
    2223            0 :                 hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
    2224            0 :                                    perhash->aggnode->numGroups,
    2225              :                                    aggstate->hashentrysize);
    2226              : 
    2227       420294 :             hashagg_spill_tuple(aggstate, spill, slot, hash);
    2228       420294 :             pergroup[setno] = NULL;
    2229              :         }
    2230              :     }
    2231      4682571 : }
    2232              : 
    2233              : /*
    2234              :  * ExecAgg -
    2235              :  *
    2236              :  *    ExecAgg receives tuples from its outer subplan and aggregates over
    2237              :  *    the appropriate attribute for each aggregate function use (Aggref
    2238              :  *    node) appearing in the targetlist or qual of the node.  The number
    2239              :  *    of tuples to aggregate over depends on whether grouped or plain
    2240              :  *    aggregation is selected.  In grouped aggregation, we produce a result
    2241              :  *    row for each group; in plain aggregation there's a single result row
    2242              :  *    for the whole query.  In either case, the value of each aggregate is
    2243              :  *    stored in the expression context to be used when ExecProject evaluates
    2244              :  *    the result tuple.
    2245              :  */
    2246              : static TupleTableSlot *
    2247       508202 : ExecAgg(PlanState *pstate)
    2248              : {
    2249       508202 :     AggState   *node = castNode(AggState, pstate);
    2250       508202 :     TupleTableSlot *result = NULL;
    2251              : 
    2252       508202 :     CHECK_FOR_INTERRUPTS();
    2253              : 
    2254       508202 :     if (!node->agg_done)
    2255              :     {
    2256              :         /* Dispatch based on strategy */
    2257       469064 :         switch (node->phase->aggstrategy)
    2258              :         {
    2259       300582 :             case AGG_HASHED:
    2260       300582 :                 if (!node->table_filled)
    2261         9729 :                     agg_fill_hash_table(node);
    2262              :                 pg_fallthrough;
    2263              :             case AGG_MIXED:
    2264       318840 :                 result = agg_retrieve_hash_table(node);
    2265       318840 :                 break;
    2266       150224 :             case AGG_PLAIN:
    2267              :             case AGG_SORTED:
    2268       150224 :                 result = agg_retrieve_direct(node);
    2269       150110 :                 break;
    2270              :         }
    2271              : 
    2272       468950 :         if (!TupIsNull(result))
    2273       458009 :             return result;
    2274              :     }
    2275              : 
    2276        50079 :     return NULL;
    2277              : }
    2278              : 
    2279              : /*
    2280              :  * ExecAgg for non-hashed case
    2281              :  */
    2282              : static TupleTableSlot *
    2283       150224 : agg_retrieve_direct(AggState *aggstate)
    2284              : {
    2285       150224 :     Agg        *node = aggstate->phase->aggnode;
    2286              :     ExprContext *econtext;
    2287              :     ExprContext *tmpcontext;
    2288              :     AggStatePerAgg peragg;
    2289              :     AggStatePerGroup *pergroups;
    2290              :     TupleTableSlot *outerslot;
    2291              :     TupleTableSlot *firstSlot;
    2292              :     TupleTableSlot *result;
    2293       150224 :     bool        hasGroupingSets = aggstate->phase->numsets > 0;
    2294       150224 :     int         numGroupingSets = Max(aggstate->phase->numsets, 1);
    2295              :     int         currentSet;
    2296              :     int         nextSetSize;
    2297              :     int         numReset;
    2298              :     int         i;
    2299              : 
    2300              :     /*
    2301              :      * get state info from node
    2302              :      *
    2303              :      * econtext is the per-output-tuple expression context
    2304              :      *
    2305              :      * tmpcontext is the per-input-tuple expression context
    2306              :      */
    2307       150224 :     econtext = aggstate->ss.ps.ps_ExprContext;
    2308       150224 :     tmpcontext = aggstate->tmpcontext;
    2309              : 
    2310       150224 :     peragg = aggstate->peragg;
    2311       150224 :     pergroups = aggstate->pergroups;
    2312       150224 :     firstSlot = aggstate->ss.ss_ScanTupleSlot;
    2313              : 
    2314              :     /*
    2315              :      * We loop retrieving groups until we find one matching
    2316              :      * aggstate->ss.ps.qual
    2317              :      *
    2318              :      * For grouping sets, we have the invariant that aggstate->projected_set
    2319              :      * is either -1 (initial call) or the index (starting from 0) in
    2320              :      * gset_lengths for the group we just completed (either by projecting a
    2321              :      * row or by discarding it in the qual).
    2322              :      */
    2323       197434 :     while (!aggstate->agg_done)
    2324              :     {
    2325              :         /*
    2326              :          * Clear the per-output-tuple context for each group, as well as
    2327              :          * aggcontext (which contains any pass-by-ref transvalues of the old
    2328              :          * group).  Some aggregate functions store working state in child
    2329              :          * contexts; those now get reset automatically without us needing to
    2330              :          * do anything special.
    2331              :          *
    2332              :          * We use ReScanExprContext not just ResetExprContext because we want
    2333              :          * any registered shutdown callbacks to be called.  That allows
    2334              :          * aggregate functions to ensure they've cleaned up any non-memory
    2335              :          * resources.
    2336              :          */
    2337       197297 :         ReScanExprContext(econtext);
    2338              : 
    2339              :         /*
    2340              :          * Determine how many grouping sets need to be reset at this boundary.
    2341              :          */
    2342       197297 :         if (aggstate->projected_set >= 0 &&
    2343       154787 :             aggstate->projected_set < numGroupingSets)
    2344       154775 :             numReset = aggstate->projected_set + 1;
    2345              :         else
    2346        42522 :             numReset = numGroupingSets;
    2347              : 
    2348              :         /*
    2349              :          * numReset can change on a phase boundary, but that's OK; we want to
    2350              :          * reset the contexts used in _this_ phase, and later, after possibly
    2351              :          * changing phase, initialize the right number of aggregates for the
    2352              :          * _new_ phase.
    2353              :          */
    2354              : 
    2355       409443 :         for (i = 0; i < numReset; i++)
    2356              :         {
    2357       212146 :             ReScanExprContext(aggstate->aggcontexts[i]);
    2358              :         }
    2359              : 
    2360              :         /*
    2361              :          * Check if input is complete and there are no more groups to project
    2362              :          * in this phase; move to next phase or mark as done.
    2363              :          */
    2364       197297 :         if (aggstate->input_done == true &&
    2365         1122 :             aggstate->projected_set >= (numGroupingSets - 1))
    2366              :         {
    2367          572 :             if (aggstate->current_phase < aggstate->numphases - 1)
    2368              :             {
    2369          148 :                 initialize_phase(aggstate, aggstate->current_phase + 1);
    2370          148 :                 aggstate->input_done = false;
    2371          148 :                 aggstate->projected_set = -1;
    2372          148 :                 numGroupingSets = Max(aggstate->phase->numsets, 1);
    2373          148 :                 node = aggstate->phase->aggnode;
    2374          148 :                 numReset = numGroupingSets;
    2375              :             }
    2376          424 :             else if (aggstate->aggstrategy == AGG_MIXED)
    2377              :             {
    2378              :                 /*
    2379              :                  * Mixed mode; we've output all the grouped stuff and have
    2380              :                  * full hashtables, so switch to outputting those.
    2381              :                  */
    2382          112 :                 initialize_phase(aggstate, 0);
    2383          112 :                 aggstate->table_filled = true;
    2384          112 :                 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
    2385              :                                        &aggstate->perhash[0].hashiter);
    2386          112 :                 select_current_set(aggstate, 0, true);
    2387          112 :                 return agg_retrieve_hash_table(aggstate);
    2388              :             }
    2389              :             else
    2390              :             {
    2391          312 :                 aggstate->agg_done = true;
    2392          312 :                 break;
    2393              :             }
    2394              :         }
    2395              : 
    2396              :         /*
    2397              :          * Get the number of columns in the next grouping set after the last
    2398              :          * projected one (if any). This is the number of columns to compare to
    2399              :          * see if we reached the boundary of that set too.
    2400              :          */
    2401       196873 :         if (aggstate->projected_set >= 0 &&
    2402       154215 :             aggstate->projected_set < (numGroupingSets - 1))
    2403        18254 :             nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
    2404              :         else
    2405       178619 :             nextSetSize = 0;
    2406              : 
    2407              :         /*----------
    2408              :          * If a subgroup for the current grouping set is present, project it.
    2409              :          *
    2410              :          * We have a new group if:
    2411              :          *  - we're out of input but haven't projected all grouping sets
    2412              :          *    (checked above)
    2413              :          * OR
    2414              :          *    - we already projected a row that wasn't from the last grouping
    2415              :          *      set
    2416              :          *    AND
    2417              :          *    - the next grouping set has at least one grouping column (since
    2418              :          *      empty grouping sets project only once input is exhausted)
    2419              :          *    AND
    2420              :          *    - the previous and pending rows differ on the grouping columns
    2421              :          *      of the next grouping set
    2422              :          *----------
    2423              :          */
    2424       196873 :         tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
    2425       196873 :         if (aggstate->input_done ||
    2426       196323 :             (node->aggstrategy != AGG_PLAIN &&
    2427       155024 :              aggstate->projected_set != -1 &&
    2428       153665 :              aggstate->projected_set < (numGroupingSets - 1) &&
    2429        13294 :              nextSetSize > 0 &&
    2430        13294 :              !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
    2431              :                                tmpcontext)))
    2432              :         {
    2433         9436 :             aggstate->projected_set += 1;
    2434              : 
    2435              :             Assert(aggstate->projected_set < numGroupingSets);
    2436         9436 :             Assert(nextSetSize > 0 || aggstate->input_done);
    2437              :         }
    2438              :         else
    2439              :         {
    2440              :             /*
    2441              :              * We no longer care what group we just projected, the next
    2442              :              * projection will always be the first (or only) grouping set
    2443              :              * (unless the input proves to be empty).
    2444              :              */
    2445       187437 :             aggstate->projected_set = 0;
    2446              : 
    2447              :             /*
    2448              :              * If we don't already have the first tuple of the new group,
    2449              :              * fetch it from the outer plan.
    2450              :              */
    2451       187437 :             if (aggstate->grp_firstTuple == NULL)
    2452              :             {
    2453        42658 :                 outerslot = fetch_input_tuple(aggstate);
    2454        42627 :                 if (!TupIsNull(outerslot))
    2455              :                 {
    2456              :                     /*
    2457              :                      * Make a copy of the first input tuple; we will use this
    2458              :                      * for comparisons (in group mode) and for projection.
    2459              :                      */
    2460        35253 :                     aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
    2461              :                 }
    2462              :                 else
    2463              :                 {
    2464              :                     /* outer plan produced no tuples at all */
    2465         7374 :                     if (hasGroupingSets)
    2466              :                     {
    2467              :                         /*
    2468              :                          * If there was no input at all, we need to project
    2469              :                          * rows only if there are grouping sets of size 0.
    2470              :                          * Note that this implies that there can't be any
    2471              :                          * references to ungrouped Vars, which would otherwise
    2472              :                          * cause issues with the empty output slot.
    2473              :                          *
    2474              :                          * XXX: This is no longer true, we currently deal with
    2475              :                          * this in finalize_aggregates().
    2476              :                          */
    2477           52 :                         aggstate->input_done = true;
    2478              : 
    2479           72 :                         while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
    2480              :                         {
    2481           32 :                             aggstate->projected_set += 1;
    2482           32 :                             if (aggstate->projected_set >= numGroupingSets)
    2483              :                             {
    2484              :                                 /*
    2485              :                                  * We can't set agg_done here because we might
    2486              :                                  * have more phases to do, even though the
    2487              :                                  * input is empty. So we need to restart the
    2488              :                                  * whole outer loop.
    2489              :                                  */
    2490           12 :                                 break;
    2491              :                             }
    2492              :                         }
    2493              : 
    2494           52 :                         if (aggstate->projected_set >= numGroupingSets)
    2495           12 :                             continue;
    2496              :                     }
    2497              :                     else
    2498              :                     {
    2499         7322 :                         aggstate->agg_done = true;
    2500              :                         /* If we are grouping, we should produce no tuples too */
    2501         7322 :                         if (node->aggstrategy != AGG_PLAIN)
    2502          116 :                             return NULL;
    2503              :                     }
    2504              :                 }
    2505              :             }
    2506              : 
    2507              :             /*
    2508              :              * Initialize working state for a new input tuple group.
    2509              :              */
    2510       187278 :             initialize_aggregates(aggstate, pergroups, numReset);
    2511              : 
    2512       187278 :             if (aggstate->grp_firstTuple != NULL)
    2513              :             {
    2514              :                 /*
    2515              :                  * Store the copied first input tuple in the tuple table slot
    2516              :                  * reserved for it.  The tuple will be deleted when it is
    2517              :                  * cleared from the slot.
    2518              :                  */
    2519       180032 :                 ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
    2520              :                                         firstSlot, true);
    2521       180032 :                 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
    2522              : 
    2523              :                 /* set up for first advance_aggregates call */
    2524       180032 :                 tmpcontext->ecxt_outertuple = firstSlot;
    2525              : 
    2526              :                 /*
    2527              :                  * Process each outer-plan tuple, and then fetch the next one,
    2528              :                  * until we exhaust the outer plan or cross a group boundary.
    2529              :                  */
    2530              :                 for (;;)
    2531              :                 {
    2532              :                     /*
    2533              :                      * During phase 1 only of a mixed agg, we need to update
    2534              :                      * hashtables as well in advance_aggregates.
    2535              :                      */
    2536     14249469 :                     if (aggstate->aggstrategy == AGG_MIXED &&
    2537        25194 :                         aggstate->current_phase == 1)
    2538              :                     {
    2539        25194 :                         lookup_hash_entries(aggstate);
    2540              :                     }
    2541              : 
    2542              :                     /* Advance the aggregates (or combine functions) */
    2543     14249469 :                     advance_aggregates(aggstate);
    2544              : 
    2545              :                     /* Reset per-input-tuple context after each tuple */
    2546     14249417 :                     ResetExprContext(tmpcontext);
    2547              : 
    2548     14249417 :                     outerslot = fetch_input_tuple(aggstate);
    2549     14249402 :                     if (TupIsNull(outerslot))
    2550              :                     {
    2551              :                         /* no more outer-plan tuples available */
    2552              : 
    2553              :                         /* if we built hash tables, finalize any spills */
    2554        35170 :                         if (aggstate->aggstrategy == AGG_MIXED &&
    2555          104 :                             aggstate->current_phase == 1)
    2556          104 :                             hashagg_finish_initial_spills(aggstate);
    2557              : 
    2558        35170 :                         if (hasGroupingSets)
    2559              :                         {
    2560          520 :                             aggstate->input_done = true;
    2561          520 :                             break;
    2562              :                         }
    2563              :                         else
    2564              :                         {
    2565        34650 :                             aggstate->agg_done = true;
    2566        34650 :                             break;
    2567              :                         }
    2568              :                     }
    2569              :                     /* set up for next advance_aggregates call */
    2570     14214232 :                     tmpcontext->ecxt_outertuple = outerslot;
    2571              : 
    2572              :                     /*
    2573              :                      * If we are grouping, check whether we've crossed a group
    2574              :                      * boundary.
    2575              :                      */
    2576     14214232 :                     if (node->aggstrategy != AGG_PLAIN && node->numCols > 0)
    2577              :                     {
    2578      1674974 :                         tmpcontext->ecxt_innertuple = firstSlot;
    2579      1674974 :                         if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
    2580              :                                       tmpcontext))
    2581              :                         {
    2582       144795 :                             aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
    2583       144795 :                             break;
    2584              :                         }
    2585              :                     }
    2586              :                 }
    2587              :             }
    2588              : 
    2589              :             /*
    2590              :              * Use the representative input tuple for any references to
    2591              :              * non-aggregated input columns in aggregate direct args, the node
    2592              :              * qual, and the tlist.  (If we are not grouping, and there are no
    2593              :              * input rows at all, we will come here with an empty firstSlot
    2594              :              * ... but if not grouping, there can't be any references to
    2595              :              * non-aggregated input columns, so no problem.)
    2596              :              */
    2597       187211 :             econtext->ecxt_outertuple = firstSlot;
    2598              :         }
    2599              : 
    2600              :         Assert(aggstate->projected_set >= 0);
    2601              : 
    2602       196647 :         currentSet = aggstate->projected_set;
    2603              : 
    2604       196647 :         prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
    2605              : 
    2606       196647 :         select_current_set(aggstate, currentSet, false);
    2607              : 
    2608       196647 :         finalize_aggregates(aggstate,
    2609              :                             peragg,
    2610       196647 :                             pergroups[currentSet]);
    2611              : 
    2612              :         /*
    2613              :          * If there's no row to project right now, we must continue rather
    2614              :          * than returning a null since there might be more groups.
    2615              :          */
    2616       196639 :         result = project_aggregates(aggstate);
    2617       196631 :         if (result)
    2618       149433 :             return result;
    2619              :     }
    2620              : 
    2621              :     /* No more groups */
    2622          449 :     return NULL;
    2623              : }
    2624              : 
    2625              : /*
    2626              :  * ExecAgg for hashed case: read input and build hash table
    2627              :  */
    2628              : static void
    2629         9729 : agg_fill_hash_table(AggState *aggstate)
    2630              : {
    2631              :     TupleTableSlot *outerslot;
    2632         9729 :     ExprContext *tmpcontext = aggstate->tmpcontext;
    2633              : 
    2634              :     /*
    2635              :      * Process each outer-plan tuple, and then fetch the next one, until we
    2636              :      * exhaust the outer plan.
    2637              :      */
    2638              :     for (;;)
    2639              :     {
    2640      4667106 :         outerslot = fetch_input_tuple(aggstate);
    2641      4667106 :         if (TupIsNull(outerslot))
    2642              :             break;
    2643              : 
    2644              :         /* set up for lookup_hash_entries and advance_aggregates */
    2645      4657377 :         tmpcontext->ecxt_outertuple = outerslot;
    2646              : 
    2647              :         /* Find or build hashtable entries */
    2648      4657377 :         lookup_hash_entries(aggstate);
    2649              : 
    2650              :         /* Advance the aggregates (or combine functions) */
    2651      4657377 :         advance_aggregates(aggstate);
    2652              : 
    2653              :         /*
    2654              :          * Reset per-input-tuple context after each tuple, but note that the
    2655              :          * hash lookups do this too
    2656              :          */
    2657      4657377 :         ResetExprContext(aggstate->tmpcontext);
    2658              :     }
    2659              : 
    2660              :     /* finalize spills, if any */
    2661         9729 :     hashagg_finish_initial_spills(aggstate);
    2662              : 
    2663         9729 :     aggstate->table_filled = true;
    2664              :     /* Initialize to walk the first hash table */
    2665         9729 :     select_current_set(aggstate, 0, true);
    2666         9729 :     ResetTupleHashIterator(aggstate->perhash[0].hashtable,
    2667              :                            &aggstate->perhash[0].hashiter);
    2668         9729 : }
    2669              : 
    2670              : /*
    2671              :  * If any data was spilled during hash aggregation, reset the hash table and
    2672              :  * reprocess one batch of spilled data. After reprocessing a batch, the hash
    2673              :  * table will again contain data, ready to be consumed by
    2674              :  * agg_retrieve_hash_table_in_memory().
    2675              :  *
    2676              :  * Should only be called after all in memory hash table entries have been
    2677              :  * finalized and emitted.
    2678              :  *
    2679              :  * Return false when input is exhausted and there's no more work to be done;
    2680              :  * otherwise return true.
    2681              :  */
    2682              : static bool
    2683        28333 : agg_refill_hash_table(AggState *aggstate)
    2684              : {
    2685              :     HashAggBatch *batch;
    2686              :     AggStatePerHash perhash;
    2687              :     HashAggSpill spill;
    2688        28333 :     LogicalTapeSet *tapeset = aggstate->hash_tapeset;
    2689        28333 :     bool        spill_initialized = false;
    2690              : 
    2691        28333 :     if (aggstate->hash_batches == NIL)
    2692        10376 :         return false;
    2693              : 
    2694              :     /* hash_batches is a stack, with the top item at the end of the list */
    2695        17957 :     batch = llast(aggstate->hash_batches);
    2696        17957 :     aggstate->hash_batches = list_delete_last(aggstate->hash_batches);
    2697              : 
    2698        17957 :     hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
    2699              :                         batch->used_bits, &aggstate->hash_mem_limit,
    2700              :                         &aggstate->hash_ngroups_limit, NULL);
    2701              : 
    2702              :     /*
    2703              :      * Each batch only processes one grouping set; set the rest to NULL so
    2704              :      * that advance_aggregates() knows to ignore them. We don't touch
    2705              :      * pergroups for sorted grouping sets here, because they will be needed if
    2706              :      * we rescan later. The expressions for sorted grouping sets will not be
    2707              :      * evaluated after we recompile anyway.
    2708              :      */
    2709       138274 :     MemSet(aggstate->hash_pergroup, 0,
    2710              :            sizeof(AggStatePerGroup) * aggstate->num_hashes);
    2711              : 
    2712              :     /* free memory and reset hash tables */
    2713        17957 :     ReScanExprContext(aggstate->hashcontext);
    2714       138274 :     for (int setno = 0; setno < aggstate->num_hashes; setno++)
    2715       120317 :         ResetTupleHashTable(aggstate->perhash[setno].hashtable);
    2716              : 
    2717        17957 :     aggstate->hash_ngroups_current = 0;
    2718              : 
    2719              :     /*
    2720              :      * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
    2721              :      * happens in phase 0. So, we switch to phase 1 when processing a batch,
    2722              :      * and back to phase 0 after the batch is done.
    2723              :      */
    2724              :     Assert(aggstate->current_phase == 0);
    2725        17957 :     if (aggstate->phase->aggstrategy == AGG_MIXED)
    2726              :     {
    2727        17508 :         aggstate->current_phase = 1;
    2728        17508 :         aggstate->phase = &aggstate->phases[aggstate->current_phase];
    2729              :     }
    2730              : 
    2731        17957 :     select_current_set(aggstate, batch->setno, true);
    2732              : 
    2733        17957 :     perhash = &aggstate->perhash[aggstate->current_set];
    2734              : 
    2735              :     /*
    2736              :      * Spilled tuples are always read back as MinimalTuples, which may be
    2737              :      * different from the outer plan, so recompile the aggregate expressions.
    2738              :      *
    2739              :      * We still need the NULL check, because we are only processing one
    2740              :      * grouping set at a time and the rest will be NULL.
    2741              :      */
    2742        17957 :     hashagg_recompile_expressions(aggstate, true, true);
    2743              : 
    2744        17957 :     INJECTION_POINT("hash-aggregate-process-batch", NULL);
    2745              :     for (;;)
    2746       717854 :     {
    2747       735811 :         TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
    2748       735811 :         TupleTableSlot *hashslot = perhash->hashslot;
    2749       735811 :         TupleHashTable hashtable = perhash->hashtable;
    2750              :         TupleHashEntry entry;
    2751              :         MinimalTuple tuple;
    2752              :         uint32      hash;
    2753       735811 :         bool        isnew = false;
    2754       735811 :         bool       *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
    2755              : 
    2756       735811 :         CHECK_FOR_INTERRUPTS();
    2757              : 
    2758       735811 :         tuple = hashagg_batch_read(batch, &hash);
    2759       735811 :         if (tuple == NULL)
    2760        17957 :             break;
    2761              : 
    2762       717854 :         ExecStoreMinimalTuple(tuple, spillslot, true);
    2763       717854 :         aggstate->tmpcontext->ecxt_outertuple = spillslot;
    2764              : 
    2765       717854 :         prepare_hash_slot(perhash,
    2766       717854 :                           aggstate->tmpcontext->ecxt_outertuple,
    2767              :                           hashslot);
    2768       717854 :         entry = LookupTupleHashEntryHash(hashtable, hashslot,
    2769              :                                          p_isnew, hash);
    2770              : 
    2771       717854 :         if (entry != NULL)
    2772              :         {
    2773       420294 :             if (isnew)
    2774        93437 :                 initialize_hash_entry(aggstate, hashtable, entry);
    2775       420294 :             aggstate->hash_pergroup[batch->setno] = TupleHashEntryGetAdditional(hashtable, entry);
    2776       420294 :             advance_aggregates(aggstate);
    2777              :         }
    2778              :         else
    2779              :         {
    2780       297560 :             if (!spill_initialized)
    2781              :             {
    2782              :                 /*
    2783              :                  * Avoid initializing the spill until we actually need it so
    2784              :                  * that we don't assign tapes that will never be used.
    2785              :                  */
    2786         8332 :                 spill_initialized = true;
    2787         8332 :                 hashagg_spill_init(&spill, tapeset, batch->used_bits,
    2788              :                                    batch->input_card, aggstate->hashentrysize);
    2789              :             }
    2790              :             /* no memory for a new group, spill */
    2791       297560 :             hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
    2792              : 
    2793       297560 :             aggstate->hash_pergroup[batch->setno] = NULL;
    2794              :         }
    2795              : 
    2796              :         /*
    2797              :          * Reset per-input-tuple context after each tuple, but note that the
    2798              :          * hash lookups do this too
    2799              :          */
    2800       717854 :         ResetExprContext(aggstate->tmpcontext);
    2801              :     }
    2802              : 
    2803        17957 :     LogicalTapeClose(batch->input_tape);
    2804              : 
    2805              :     /* change back to phase 0 */
    2806        17957 :     aggstate->current_phase = 0;
    2807        17957 :     aggstate->phase = &aggstate->phases[aggstate->current_phase];
    2808              : 
    2809        17957 :     if (spill_initialized)
    2810              :     {
    2811         8332 :         hashagg_spill_finish(aggstate, &spill, batch->setno);
    2812         8332 :         hash_agg_update_metrics(aggstate, true, spill.npartitions);
    2813              :     }
    2814              :     else
    2815         9625 :         hash_agg_update_metrics(aggstate, true, 0);
    2816              : 
    2817        17957 :     aggstate->hash_spill_mode = false;
    2818              : 
    2819              :     /* prepare to walk the first hash table */
    2820        17957 :     select_current_set(aggstate, batch->setno, true);
    2821        17957 :     ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
    2822              :                            &aggstate->perhash[batch->setno].hashiter);
    2823              : 
    2824        17957 :     pfree(batch);
    2825              : 
    2826        17957 :     return true;
    2827              : }
    2828              : 
    2829              : /*
    2830              :  * ExecAgg for hashed case: retrieving groups from hash table
    2831              :  *
    2832              :  * After exhausting in-memory tuples, also try refilling the hash table using
    2833              :  * previously-spilled tuples. Only returns NULL after all in-memory and
    2834              :  * spilled tuples are exhausted.
    2835              :  */
    2836              : static TupleTableSlot *
    2837       318952 : agg_retrieve_hash_table(AggState *aggstate)
    2838              : {
    2839       318952 :     TupleTableSlot *result = NULL;
    2840              : 
    2841       645485 :     while (result == NULL)
    2842              :     {
    2843       336909 :         result = agg_retrieve_hash_table_in_memory(aggstate);
    2844       336909 :         if (result == NULL)
    2845              :         {
    2846        28333 :             if (!agg_refill_hash_table(aggstate))
    2847              :             {
    2848        10376 :                 aggstate->agg_done = true;
    2849        10376 :                 break;
    2850              :             }
    2851              :         }
    2852              :     }
    2853              : 
    2854       318952 :     return result;
    2855              : }
    2856              : 
    2857              : /*
    2858              :  * Retrieve the groups from the in-memory hash tables without considering any
    2859              :  * spilled tuples.
    2860              :  */
    2861              : static TupleTableSlot *
    2862       336909 : agg_retrieve_hash_table_in_memory(AggState *aggstate)
    2863              : {
    2864              :     ExprContext *econtext;
    2865              :     AggStatePerAgg peragg;
    2866              :     AggStatePerGroup pergroup;
    2867              :     TupleHashEntry entry;
    2868              :     TupleTableSlot *firstSlot;
    2869              :     TupleTableSlot *result;
    2870              :     AggStatePerHash perhash;
    2871              : 
    2872              :     /*
    2873              :      * get state info from node.
    2874              :      *
    2875              :      * econtext is the per-output-tuple expression context.
    2876              :      */
    2877       336909 :     econtext = aggstate->ss.ps.ps_ExprContext;
    2878       336909 :     peragg = aggstate->peragg;
    2879       336909 :     firstSlot = aggstate->ss.ss_ScanTupleSlot;
    2880              : 
    2881              :     /*
    2882              :      * Note that perhash (and therefore anything accessed through it) can
    2883              :      * change inside the loop, as we change between grouping sets.
    2884              :      */
    2885       336909 :     perhash = &aggstate->perhash[aggstate->current_set];
    2886              : 
    2887              :     /*
    2888              :      * We loop retrieving groups until we find one satisfying
    2889              :      * aggstate->ss.ps.qual
    2890              :      */
    2891              :     for (;;)
    2892        90601 :     {
    2893       427510 :         TupleTableSlot *hashslot = perhash->hashslot;
    2894       427510 :         TupleHashTable hashtable = perhash->hashtable;
    2895              :         int         i;
    2896              : 
    2897       427510 :         CHECK_FOR_INTERRUPTS();
    2898              : 
    2899              :         /*
    2900              :          * Find the next entry in the hash table
    2901              :          */
    2902       427510 :         entry = ScanTupleHashTable(hashtable, &perhash->hashiter);
    2903       427510 :         if (entry == NULL)
    2904              :         {
    2905        95200 :             int         nextset = aggstate->current_set + 1;
    2906              : 
    2907        95200 :             if (nextset < aggstate->num_hashes)
    2908              :             {
    2909              :                 /*
    2910              :                  * Switch to next grouping set, reinitialize, and restart the
    2911              :                  * loop.
    2912              :                  */
    2913        66867 :                 select_current_set(aggstate, nextset, true);
    2914              : 
    2915        66867 :                 perhash = &aggstate->perhash[aggstate->current_set];
    2916              : 
    2917        66867 :                 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
    2918              : 
    2919        66867 :                 continue;
    2920              :             }
    2921              :             else
    2922              :             {
    2923        28333 :                 return NULL;
    2924              :             }
    2925              :         }
    2926              : 
    2927              :         /*
    2928              :          * Clear the per-output-tuple context for each group
    2929              :          *
    2930              :          * We intentionally don't use ReScanExprContext here; if any aggs have
    2931              :          * registered shutdown callbacks, they mustn't be called yet, since we
    2932              :          * might not be done with that agg.
    2933              :          */
    2934       332310 :         ResetExprContext(econtext);
    2935              : 
    2936              :         /*
    2937              :          * Transform representative tuple back into one with the right
    2938              :          * columns.
    2939              :          */
    2940       332310 :         ExecStoreMinimalTuple(TupleHashEntryGetTuple(entry), hashslot, false);
    2941       332310 :         slot_getallattrs(hashslot);
    2942              : 
    2943       332310 :         ExecClearTuple(firstSlot);
    2944       332310 :         memset(firstSlot->tts_isnull, true,
    2945       332310 :                firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
    2946              : 
    2947       878456 :         for (i = 0; i < perhash->numhashGrpCols; i++)
    2948              :         {
    2949       546146 :             int         varNumber = perhash->hashGrpColIdxInput[i] - 1;
    2950              : 
    2951       546146 :             firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
    2952       546146 :             firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
    2953              :         }
    2954       332310 :         ExecStoreVirtualTuple(firstSlot);
    2955              : 
    2956       332310 :         pergroup = (AggStatePerGroup) TupleHashEntryGetAdditional(hashtable, entry);
    2957              : 
    2958              :         /*
    2959              :          * Use the representative input tuple for any references to
    2960              :          * non-aggregated input columns in the qual and tlist.
    2961              :          */
    2962       332310 :         econtext->ecxt_outertuple = firstSlot;
    2963              : 
    2964       332310 :         prepare_projection_slot(aggstate,
    2965              :                                 econtext->ecxt_outertuple,
    2966              :                                 aggstate->current_set);
    2967              : 
    2968       332310 :         finalize_aggregates(aggstate, peragg, pergroup);
    2969              : 
    2970       332310 :         result = project_aggregates(aggstate);
    2971       332310 :         if (result)
    2972       308576 :             return result;
    2973              :     }
    2974              : 
    2975              :     /* No more groups */
    2976              :     return NULL;
    2977              : }
    2978              : 
    2979              : /*
    2980              :  * hashagg_spill_init
    2981              :  *
    2982              :  * Called after we determined that spilling is necessary. Chooses the number
    2983              :  * of partitions to create, and initializes them.
    2984              :  */
    2985              : static void
    2986         8412 : hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
    2987              :                    double input_groups, double hashentrysize)
    2988              : {
    2989              :     int         npartitions;
    2990              :     int         partition_bits;
    2991              : 
    2992         8412 :     npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
    2993              :                                              used_bits, &partition_bits);
    2994              : 
    2995              : #ifdef USE_INJECTION_POINTS
    2996         8412 :     if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
    2997              :     {
    2998            5 :         npartitions = 1;
    2999            5 :         partition_bits = 0;
    3000            5 :         INJECTION_POINT_CACHED("hash-aggregate-single-partition", NULL);
    3001              :     }
    3002              : #endif
    3003              : 
    3004         8412 :     spill->partitions = palloc0_array(LogicalTape *, npartitions);
    3005         8412 :     spill->ntuples = palloc0_array(int64, npartitions);
    3006         8412 :     spill->hll_card = palloc0_array(hyperLogLogState, npartitions);
    3007              : 
    3008        42045 :     for (int i = 0; i < npartitions; i++)
    3009        33633 :         spill->partitions[i] = LogicalTapeCreate(tapeset);
    3010              : 
    3011         8412 :     spill->shift = 32 - used_bits - partition_bits;
    3012         8412 :     if (spill->shift < 32)
    3013         8407 :         spill->mask = (npartitions - 1) << spill->shift;
    3014              :     else
    3015            5 :         spill->mask = 0;
    3016         8412 :     spill->npartitions = npartitions;
    3017              : 
    3018        42045 :     for (int i = 0; i < npartitions; i++)
    3019        33633 :         initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
    3020         8412 : }
    3021              : 
    3022              : /*
    3023              :  * hashagg_spill_tuple
    3024              :  *
    3025              :  * No room for new groups in the hash table. Save for later in the appropriate
    3026              :  * partition.
    3027              :  */
    3028              : static Size
    3029       717854 : hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
    3030              :                     TupleTableSlot *inputslot, uint32 hash)
    3031              : {
    3032              :     TupleTableSlot *spillslot;
    3033              :     int         partition;
    3034              :     MinimalTuple tuple;
    3035              :     LogicalTape *tape;
    3036       717854 :     int         total_written = 0;
    3037              :     bool        shouldFree;
    3038              : 
    3039              :     Assert(spill->partitions != NULL);
    3040              : 
    3041              :     /* spill only attributes that we actually need */
    3042       717854 :     if (!aggstate->all_cols_needed)
    3043              :     {
    3044         1048 :         spillslot = aggstate->hash_spill_wslot;
    3045         1048 :         slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
    3046         1048 :         ExecClearTuple(spillslot);
    3047         3144 :         for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
    3048              :         {
    3049         2096 :             if (bms_is_member(i + 1, aggstate->colnos_needed))
    3050              :             {
    3051         1048 :                 spillslot->tts_values[i] = inputslot->tts_values[i];
    3052         1048 :                 spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
    3053              :             }
    3054              :             else
    3055         1048 :                 spillslot->tts_isnull[i] = true;
    3056              :         }
    3057         1048 :         ExecStoreVirtualTuple(spillslot);
    3058              :     }
    3059              :     else
    3060       716806 :         spillslot = inputslot;
    3061              : 
    3062       717854 :     tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
    3063              : 
    3064       717854 :     if (spill->shift < 32)
    3065       707354 :         partition = (hash & spill->mask) >> spill->shift;
    3066              :     else
    3067        10500 :         partition = 0;
    3068              : 
    3069       717854 :     spill->ntuples[partition]++;
    3070              : 
    3071              :     /*
    3072              :      * All hash values destined for a given partition have some bits in
    3073              :      * common, which causes bad HLL cardinality estimates. Hash the hash to
    3074              :      * get a more uniform distribution.
    3075              :      */
    3076       717854 :     addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
    3077              : 
    3078       717854 :     tape = spill->partitions[partition];
    3079              : 
    3080       717854 :     LogicalTapeWrite(tape, &hash, sizeof(uint32));
    3081       717854 :     total_written += sizeof(uint32);
    3082              : 
    3083       717854 :     LogicalTapeWrite(tape, tuple, tuple->t_len);
    3084       717854 :     total_written += tuple->t_len;
    3085              : 
    3086       717854 :     if (shouldFree)
    3087       420294 :         pfree(tuple);
    3088              : 
    3089       717854 :     return total_written;
    3090              : }
    3091              : 
    3092              : /*
    3093              :  * hashagg_batch_new
    3094              :  *
    3095              :  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
    3096              :  * be done.
    3097              :  */
    3098              : static HashAggBatch *
    3099        17957 : hashagg_batch_new(LogicalTape *input_tape, int setno,
    3100              :                   int64 input_tuples, double input_card, int used_bits)
    3101              : {
    3102        17957 :     HashAggBatch *batch = palloc0_object(HashAggBatch);
    3103              : 
    3104        17957 :     batch->setno = setno;
    3105        17957 :     batch->used_bits = used_bits;
    3106        17957 :     batch->input_tape = input_tape;
    3107        17957 :     batch->input_tuples = input_tuples;
    3108        17957 :     batch->input_card = input_card;
    3109              : 
    3110        17957 :     return batch;
    3111              : }
    3112              : 
    3113              : /*
    3114              :  * hashagg_batch_read
    3115              :  *      read the next tuple from a batch's tape.  Return NULL if no more.
    3116              :  */
    3117              : static MinimalTuple
    3118       735811 : hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
    3119              : {
    3120       735811 :     LogicalTape *tape = batch->input_tape;
    3121              :     MinimalTuple tuple;
    3122              :     uint32      t_len;
    3123              :     size_t      nread;
    3124              :     uint32      hash;
    3125              : 
    3126       735811 :     nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
    3127       735811 :     if (nread == 0)
    3128        17957 :         return NULL;
    3129       717854 :     if (nread != sizeof(uint32))
    3130            0 :         ereport(ERROR,
    3131              :                 (errcode_for_file_access(),
    3132              :                  errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
    3133              :                                  tape, sizeof(uint32), nread)));
    3134       717854 :     if (hashp != NULL)
    3135       717854 :         *hashp = hash;
    3136              : 
    3137       717854 :     nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
    3138       717854 :     if (nread != sizeof(uint32))
    3139            0 :         ereport(ERROR,
    3140              :                 (errcode_for_file_access(),
    3141              :                  errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
    3142              :                                  tape, sizeof(uint32), nread)));
    3143              : 
    3144       717854 :     tuple = (MinimalTuple) palloc(t_len);
    3145       717854 :     tuple->t_len = t_len;
    3146              : 
    3147       717854 :     nread = LogicalTapeRead(tape,
    3148              :                             (char *) tuple + sizeof(uint32),
    3149              :                             t_len - sizeof(uint32));
    3150       717854 :     if (nread != t_len - sizeof(uint32))
    3151            0 :         ereport(ERROR,
    3152              :                 (errcode_for_file_access(),
    3153              :                  errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
    3154              :                                  tape, t_len - sizeof(uint32), nread)));
    3155              : 
    3156       717854 :     return tuple;
    3157              : }
    3158              : 
    3159              : /*
    3160              :  * hashagg_finish_initial_spills
    3161              :  *
    3162              :  * After a HashAggBatch has been processed, it may have spilled tuples to
    3163              :  * disk. If so, turn the spilled partitions into new batches that must later
    3164              :  * be executed.
    3165              :  */
    3166              : static void
    3167         9833 : hashagg_finish_initial_spills(AggState *aggstate)
    3168              : {
    3169              :     int         setno;
    3170         9833 :     int         total_npartitions = 0;
    3171              : 
    3172         9833 :     if (aggstate->hash_spills != NULL)
    3173              :     {
    3174          120 :         for (setno = 0; setno < aggstate->num_hashes; setno++)
    3175              :         {
    3176           80 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    3177              : 
    3178           80 :             total_npartitions += spill->npartitions;
    3179           80 :             hashagg_spill_finish(aggstate, spill, setno);
    3180              :         }
    3181              : 
    3182              :         /*
    3183              :          * We're not processing tuples from outer plan any more; only
    3184              :          * processing batches of spilled tuples. The initial spill structures
    3185              :          * are no longer needed.
    3186              :          */
    3187           40 :         pfree(aggstate->hash_spills);
    3188           40 :         aggstate->hash_spills = NULL;
    3189              :     }
    3190              : 
    3191         9833 :     hash_agg_update_metrics(aggstate, false, total_npartitions);
    3192         9833 :     aggstate->hash_spill_mode = false;
    3193         9833 : }
    3194              : 
    3195              : /*
    3196              :  * hashagg_spill_finish
    3197              :  *
    3198              :  * Transform spill partitions into new batches.
    3199              :  */
    3200              : static void
    3201         8412 : hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
    3202              : {
    3203              :     int         i;
    3204         8412 :     int         used_bits = 32 - spill->shift;
    3205              : 
    3206         8412 :     if (spill->npartitions == 0)
    3207            0 :         return;                 /* didn't spill */
    3208              : 
    3209        42045 :     for (i = 0; i < spill->npartitions; i++)
    3210              :     {
    3211        33633 :         LogicalTape *tape = spill->partitions[i];
    3212              :         HashAggBatch *new_batch;
    3213              :         double      cardinality;
    3214              : 
    3215              :         /* if the partition is empty, don't create a new batch of work */
    3216        33633 :         if (spill->ntuples[i] == 0)
    3217        15676 :             continue;
    3218              : 
    3219        17957 :         cardinality = estimateHyperLogLog(&spill->hll_card[i]);
    3220        17957 :         freeHyperLogLog(&spill->hll_card[i]);
    3221              : 
    3222              :         /* rewinding frees the buffer while not in use */
    3223        17957 :         LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
    3224              : 
    3225        17957 :         new_batch = hashagg_batch_new(tape, setno,
    3226        17957 :                                       spill->ntuples[i], cardinality,
    3227              :                                       used_bits);
    3228        17957 :         aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch);
    3229        17957 :         aggstate->hash_batches_used++;
    3230              :     }
    3231              : 
    3232         8412 :     pfree(spill->ntuples);
    3233         8412 :     pfree(spill->hll_card);
    3234         8412 :     pfree(spill->partitions);
    3235              : }
    3236              : 
    3237              : /*
    3238              :  * Free resources related to a spilled HashAgg.
    3239              :  */
    3240              : static void
    3241        40340 : hashagg_reset_spill_state(AggState *aggstate)
    3242              : {
    3243              :     /* free spills from initial pass */
    3244        40340 :     if (aggstate->hash_spills != NULL)
    3245              :     {
    3246              :         int         setno;
    3247              : 
    3248            0 :         for (setno = 0; setno < aggstate->num_hashes; setno++)
    3249              :         {
    3250            0 :             HashAggSpill *spill = &aggstate->hash_spills[setno];
    3251              : 
    3252            0 :             pfree(spill->ntuples);
    3253            0 :             pfree(spill->partitions);
    3254              :         }
    3255            0 :         pfree(aggstate->hash_spills);
    3256            0 :         aggstate->hash_spills = NULL;
    3257              :     }
    3258              : 
    3259              :     /* free batches */
    3260        40340 :     list_free_deep(aggstate->hash_batches);
    3261        40340 :     aggstate->hash_batches = NIL;
    3262              : 
    3263              :     /* close tape set */
    3264        40340 :     if (aggstate->hash_tapeset != NULL)
    3265              :     {
    3266           40 :         LogicalTapeSetClose(aggstate->hash_tapeset);
    3267           40 :         aggstate->hash_tapeset = NULL;
    3268              :     }
    3269        40340 : }
    3270              : 
    3271              : 
    3272              : /* -----------------
    3273              :  * ExecInitAgg
    3274              :  *
    3275              :  *  Creates the run-time information for the agg node produced by the
    3276              :  *  planner and initializes its outer subtree.
    3277              :  *
    3278              :  * -----------------
    3279              :  */
    3280              : AggState *
    3281        33777 : ExecInitAgg(Agg *node, EState *estate, int eflags)
    3282              : {
    3283              :     AggState   *aggstate;
    3284              :     AggStatePerAgg peraggs;
    3285              :     AggStatePerTrans pertransstates;
    3286              :     AggStatePerGroup *pergroups;
    3287              :     Plan       *outerPlan;
    3288              :     ExprContext *econtext;
    3289              :     TupleDesc   scanDesc;
    3290              :     int         max_aggno;
    3291              :     int         max_transno;
    3292              :     int         numaggrefs;
    3293              :     int         numaggs;
    3294              :     int         numtrans;
    3295              :     int         phase;
    3296              :     int         phaseidx;
    3297              :     ListCell   *l;
    3298        33777 :     Bitmapset  *all_grouped_cols = NULL;
    3299        33777 :     int         numGroupingSets = 1;
    3300              :     int         numPhases;
    3301              :     int         numHashes;
    3302        33777 :     int         i = 0;
    3303        33777 :     int         j = 0;
    3304        63165 :     bool        use_hashing = (node->aggstrategy == AGG_HASHED ||
    3305        29388 :                                node->aggstrategy == AGG_MIXED);
    3306              : 
    3307              :     /* check for unsupported flags */
    3308              :     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
    3309              : 
    3310              :     /*
    3311              :      * create state structure
    3312              :      */
    3313        33777 :     aggstate = makeNode(AggState);
    3314        33777 :     aggstate->ss.ps.plan = (Plan *) node;
    3315        33777 :     aggstate->ss.ps.state = estate;
    3316        33777 :     aggstate->ss.ps.ExecProcNode = ExecAgg;
    3317              : 
    3318        33777 :     aggstate->aggs = NIL;
    3319        33777 :     aggstate->numaggs = 0;
    3320        33777 :     aggstate->numtrans = 0;
    3321        33777 :     aggstate->aggstrategy = node->aggstrategy;
    3322        33777 :     aggstate->aggsplit = node->aggsplit;
    3323        33777 :     aggstate->maxsets = 0;
    3324        33777 :     aggstate->projected_set = -1;
    3325        33777 :     aggstate->current_set = 0;
    3326        33777 :     aggstate->peragg = NULL;
    3327        33777 :     aggstate->pertrans = NULL;
    3328        33777 :     aggstate->curperagg = NULL;
    3329        33777 :     aggstate->curpertrans = NULL;
    3330        33777 :     aggstate->input_done = false;
    3331        33777 :     aggstate->agg_done = false;
    3332        33777 :     aggstate->pergroups = NULL;
    3333        33777 :     aggstate->grp_firstTuple = NULL;
    3334        33777 :     aggstate->sort_in = NULL;
    3335        33777 :     aggstate->sort_out = NULL;
    3336              : 
    3337              :     /*
    3338              :      * phases[0] always exists, but is dummy in sorted/plain mode
    3339              :      */
    3340        33777 :     numPhases = (use_hashing ? 1 : 2);
    3341        33777 :     numHashes = (use_hashing ? 1 : 0);
    3342              : 
    3343              :     /*
    3344              :      * Calculate the maximum number of grouping sets in any phase; this
    3345              :      * determines the size of some allocations.  Also calculate the number of
    3346              :      * phases, since all hashed/mixed nodes contribute to only a single phase.
    3347              :      */
    3348        33777 :     if (node->groupingSets)
    3349              :     {
    3350          664 :         numGroupingSets = list_length(node->groupingSets);
    3351              : 
    3352         1390 :         foreach(l, node->chain)
    3353              :         {
    3354          726 :             Agg        *agg = lfirst(l);
    3355              : 
    3356          726 :             numGroupingSets = Max(numGroupingSets,
    3357              :                                   list_length(agg->groupingSets));
    3358              : 
    3359              :             /*
    3360              :              * additional AGG_HASHED aggs become part of phase 0, but all
    3361              :              * others add an extra phase.
    3362              :              */
    3363          726 :             if (agg->aggstrategy != AGG_HASHED)
    3364          366 :                 ++numPhases;
    3365              :             else
    3366          360 :                 ++numHashes;
    3367              :         }
    3368              :     }
    3369              : 
    3370        33777 :     aggstate->maxsets = numGroupingSets;
    3371        33777 :     aggstate->numphases = numPhases;
    3372              : 
    3373        33777 :     aggstate->aggcontexts = palloc0_array(ExprContext *, numGroupingSets);
    3374              : 
    3375              :     /*
    3376              :      * Create expression contexts.  We need three or more, one for
    3377              :      * per-input-tuple processing, one for per-output-tuple processing, one
    3378              :      * for all the hashtables, and one for each grouping set.  The per-tuple
    3379              :      * memory context of the per-grouping-set ExprContexts (aggcontexts)
    3380              :      * replaces the standalone memory context formerly used to hold transition
    3381              :      * values.  We cheat a little by using ExecAssignExprContext() to build
    3382              :      * all of them.
    3383              :      *
    3384              :      * NOTE: the details of what is stored in aggcontexts and what is stored
    3385              :      * in the regular per-query memory context are driven by a simple
    3386              :      * decision: we want to reset the aggcontext at group boundaries (if not
    3387              :      * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
    3388              :      */
    3389        33777 :     ExecAssignExprContext(estate, &aggstate->ss.ps);
    3390        33777 :     aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
    3391              : 
    3392        68136 :     for (i = 0; i < numGroupingSets; ++i)
    3393              :     {
    3394        34359 :         ExecAssignExprContext(estate, &aggstate->ss.ps);
    3395        34359 :         aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
    3396              :     }
    3397              : 
    3398        33777 :     if (use_hashing)
    3399         4563 :         hash_create_memory(aggstate);
    3400              : 
    3401        33777 :     ExecAssignExprContext(estate, &aggstate->ss.ps);
    3402              : 
    3403              :     /*
    3404              :      * Initialize child nodes.
    3405              :      *
    3406              :      * If we are doing a hashed aggregation then the child plan does not need
    3407              :      * to handle REWIND efficiently; see ExecReScanAgg.
    3408              :      */
    3409        33777 :     if (node->aggstrategy == AGG_HASHED)
    3410         4389 :         eflags &= ~EXEC_FLAG_REWIND;
    3411        33777 :     outerPlan = outerPlan(node);
    3412        33777 :     outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
    3413              : 
    3414              :     /*
    3415              :      * initialize source tuple type.
    3416              :      */
    3417        33777 :     aggstate->ss.ps.outerops =
    3418        33777 :         ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
    3419              :                              &aggstate->ss.ps.outeropsfixed);
    3420        33777 :     aggstate->ss.ps.outeropsset = true;
    3421              : 
    3422        33777 :     ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
    3423              :                                     aggstate->ss.ps.outerops);
    3424        33777 :     scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
    3425              : 
    3426              :     /*
    3427              :      * If there are more than two phases (including a potential dummy phase
    3428              :      * 0), input will be resorted using tuplesort. Need a slot for that.
    3429              :      */
    3430        33777 :     if (numPhases > 2)
    3431              :     {
    3432          164 :         aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
    3433              :                                                      &TTSOpsMinimalTuple);
    3434              : 
    3435              :         /*
    3436              :          * The output of the tuplesort, and the output from the outer child
    3437              :          * might not use the same type of slot. In most cases the child will
    3438              :          * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
    3439              :          * input can also be presorted due an index, in which case it could be
    3440              :          * a different type of slot.
    3441              :          *
    3442              :          * XXX: For efficiency it would be good to instead/additionally
    3443              :          * generate expressions with corresponding settings of outerops* for
    3444              :          * the individual phases - deforming is often a bottleneck for
    3445              :          * aggregations with lots of rows per group. If there's multiple
    3446              :          * sorts, we know that all but the first use TTSOpsMinimalTuple (via
    3447              :          * the nodeAgg.c internal tuplesort).
    3448              :          */
    3449          164 :         if (aggstate->ss.ps.outeropsfixed &&
    3450          164 :             aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
    3451           24 :             aggstate->ss.ps.outeropsfixed = false;
    3452              :     }
    3453              : 
    3454              :     /*
    3455              :      * Initialize result type, slot and projection.
    3456              :      */
    3457        33777 :     ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
    3458        33777 :     ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
    3459              : 
    3460              :     /*
    3461              :      * initialize child expressions
    3462              :      *
    3463              :      * We expect the parser to have checked that no aggs contain other agg
    3464              :      * calls in their arguments (and just to be sure, we verify it again while
    3465              :      * initializing the plan node).  This would make no sense under SQL
    3466              :      * semantics, and it's forbidden by the spec.  Because it is true, we
    3467              :      * don't need to worry about evaluating the aggs in any particular order.
    3468              :      *
    3469              :      * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
    3470              :      * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
    3471              :      * during ExecAssignProjectionInfo, above.
    3472              :      */
    3473        33777 :     aggstate->ss.ps.qual =
    3474        33777 :         ExecInitQual(node->plan.qual, (PlanState *) aggstate);
    3475              : 
    3476              :     /*
    3477              :      * We should now have found all Aggrefs in the targetlist and quals.
    3478              :      */
    3479        33777 :     numaggrefs = list_length(aggstate->aggs);
    3480        33777 :     max_aggno = -1;
    3481        33777 :     max_transno = -1;
    3482        71893 :     foreach(l, aggstate->aggs)
    3483              :     {
    3484        38116 :         Aggref     *aggref = (Aggref *) lfirst(l);
    3485              : 
    3486        38116 :         max_aggno = Max(max_aggno, aggref->aggno);
    3487        38116 :         max_transno = Max(max_transno, aggref->aggtransno);
    3488              :     }
    3489        33777 :     aggstate->numaggs = numaggs = max_aggno + 1;
    3490        33777 :     aggstate->numtrans = numtrans = max_transno + 1;
    3491              : 
    3492              :     /*
    3493              :      * For each phase, prepare grouping set data and fmgr lookup data for
    3494              :      * compare functions.  Accumulate all_grouped_cols in passing.
    3495              :      */
    3496        33777 :     aggstate->phases = palloc0_array(AggStatePerPhaseData, numPhases);
    3497              : 
    3498        33777 :     aggstate->num_hashes = numHashes;
    3499        33777 :     if (numHashes)
    3500              :     {
    3501         4563 :         aggstate->perhash = palloc0_array(AggStatePerHashData, numHashes);
    3502         4563 :         aggstate->phases[0].numsets = 0;
    3503         4563 :         aggstate->phases[0].gset_lengths = palloc_array(int, numHashes);
    3504         4563 :         aggstate->phases[0].grouped_cols = palloc_array(Bitmapset *, numHashes);
    3505              :     }
    3506              : 
    3507        33777 :     phase = 0;
    3508        68280 :     for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
    3509              :     {
    3510              :         Agg        *aggnode;
    3511              :         Sort       *sortnode;
    3512              : 
    3513        34503 :         if (phaseidx > 0)
    3514              :         {
    3515          726 :             aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
    3516          726 :             sortnode = castNode(Sort, outerPlan(aggnode));
    3517              :         }
    3518              :         else
    3519              :         {
    3520        33777 :             aggnode = node;
    3521        33777 :             sortnode = NULL;
    3522              :         }
    3523              : 
    3524              :         Assert(phase <= 1 || sortnode);
    3525              : 
    3526        34503 :         if (aggnode->aggstrategy == AGG_HASHED
    3527        29754 :             || aggnode->aggstrategy == AGG_MIXED)
    3528         4923 :         {
    3529         4923 :             AggStatePerPhase phasedata = &aggstate->phases[0];
    3530              :             AggStatePerHash perhash;
    3531         4923 :             Bitmapset  *cols = NULL;
    3532              : 
    3533              :             Assert(phase == 0);
    3534         4923 :             i = phasedata->numsets++;
    3535         4923 :             perhash = &aggstate->perhash[i];
    3536              : 
    3537              :             /* phase 0 always points to the "real" Agg in the hash case */
    3538         4923 :             phasedata->aggnode = node;
    3539         4923 :             phasedata->aggstrategy = node->aggstrategy;
    3540              : 
    3541              :             /* but the actual Agg node representing this hash is saved here */
    3542         4923 :             perhash->aggnode = aggnode;
    3543              : 
    3544         4923 :             phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
    3545              : 
    3546        12262 :             for (j = 0; j < aggnode->numCols; ++j)
    3547         7339 :                 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
    3548              : 
    3549         4923 :             phasedata->grouped_cols[i] = cols;
    3550              : 
    3551         4923 :             all_grouped_cols = bms_add_members(all_grouped_cols, cols);
    3552         4923 :             continue;
    3553              :         }
    3554              :         else
    3555              :         {
    3556        29580 :             AggStatePerPhase phasedata = &aggstate->phases[++phase];
    3557              :             int         num_sets;
    3558              : 
    3559        29580 :             phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
    3560              : 
    3561        29580 :             if (num_sets)
    3562              :             {
    3563          742 :                 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
    3564          742 :                 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
    3565              : 
    3566          742 :                 i = 0;
    3567         2114 :                 foreach(l, aggnode->groupingSets)
    3568              :                 {
    3569         1372 :                     int         current_length = list_length(lfirst(l));
    3570         1372 :                     Bitmapset  *cols = NULL;
    3571              : 
    3572              :                     /* planner forces this to be correct */
    3573         2678 :                     for (j = 0; j < current_length; ++j)
    3574         1306 :                         cols = bms_add_member(cols, aggnode->grpColIdx[j]);
    3575              : 
    3576         1372 :                     phasedata->grouped_cols[i] = cols;
    3577         1372 :                     phasedata->gset_lengths[i] = current_length;
    3578              : 
    3579         1372 :                     ++i;
    3580              :                 }
    3581              : 
    3582          742 :                 all_grouped_cols = bms_add_members(all_grouped_cols,
    3583          742 :                                                    phasedata->grouped_cols[0]);
    3584              :             }
    3585              :             else
    3586              :             {
    3587              :                 Assert(phaseidx == 0);
    3588              : 
    3589        28838 :                 phasedata->gset_lengths = NULL;
    3590        28838 :                 phasedata->grouped_cols = NULL;
    3591              :             }
    3592              : 
    3593              :             /*
    3594              :              * If we are grouping, precompute fmgr lookup data for inner loop.
    3595              :              */
    3596        29580 :             if (aggnode->aggstrategy == AGG_SORTED)
    3597              :             {
    3598              :                 /*
    3599              :                  * Build a separate function for each subset of columns that
    3600              :                  * need to be compared.
    3601              :                  */
    3602         1850 :                 phasedata->eqfunctions = palloc0_array(ExprState *, aggnode->numCols);
    3603              : 
    3604              :                 /* for each grouping set */
    3605         3005 :                 for (int k = 0; k < phasedata->numsets; k++)
    3606              :                 {
    3607         1155 :                     int         length = phasedata->gset_lengths[k];
    3608              : 
    3609              :                     /* nothing to do for empty grouping set */
    3610         1155 :                     if (length == 0)
    3611          233 :                         continue;
    3612              : 
    3613              :                     /* if we already had one of this length, it'll do */
    3614          922 :                     if (phasedata->eqfunctions[length - 1] != NULL)
    3615           91 :                         continue;
    3616              : 
    3617          831 :                     phasedata->eqfunctions[length - 1] =
    3618          831 :                         execTuplesMatchPrepare(scanDesc,
    3619              :                                                length,
    3620          831 :                                                aggnode->grpColIdx,
    3621          831 :                                                aggnode->grpOperators,
    3622          831 :                                                aggnode->grpCollations,
    3623              :                                                (PlanState *) aggstate);
    3624              :                 }
    3625              : 
    3626              :                 /* and for all grouped columns, unless already computed */
    3627         1850 :                 if (aggnode->numCols > 0 &&
    3628         1781 :                     phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
    3629              :                 {
    3630         1208 :                     phasedata->eqfunctions[aggnode->numCols - 1] =
    3631         1208 :                         execTuplesMatchPrepare(scanDesc,
    3632              :                                                aggnode->numCols,
    3633         1208 :                                                aggnode->grpColIdx,
    3634         1208 :                                                aggnode->grpOperators,
    3635         1208 :                                                aggnode->grpCollations,
    3636              :                                                (PlanState *) aggstate);
    3637              :                 }
    3638              :             }
    3639              : 
    3640        29580 :             phasedata->aggnode = aggnode;
    3641        29580 :             phasedata->aggstrategy = aggnode->aggstrategy;
    3642        29580 :             phasedata->sortnode = sortnode;
    3643              :         }
    3644              :     }
    3645              : 
    3646              :     /*
    3647              :      * Convert all_grouped_cols to a descending-order list.
    3648              :      */
    3649        33777 :     i = -1;
    3650        41588 :     while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
    3651         7811 :         aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
    3652              : 
    3653              :     /*
    3654              :      * Set up aggregate-result storage in the output expr context, and also
    3655              :      * allocate my private per-agg working storage
    3656              :      */
    3657        33777 :     econtext = aggstate->ss.ps.ps_ExprContext;
    3658        33777 :     econtext->ecxt_aggvalues = palloc0_array(Datum, numaggs);
    3659        33777 :     econtext->ecxt_aggnulls = palloc0_array(bool, numaggs);
    3660              : 
    3661        33777 :     peraggs = palloc0_array(AggStatePerAggData, numaggs);
    3662        33777 :     pertransstates = palloc0_array(AggStatePerTransData, numtrans);
    3663              : 
    3664        33777 :     aggstate->peragg = peraggs;
    3665        33777 :     aggstate->pertrans = pertransstates;
    3666              : 
    3667              : 
    3668        33777 :     aggstate->all_pergroups = palloc0_array(AggStatePerGroup, numGroupingSets + numHashes);
    3669        33777 :     pergroups = aggstate->all_pergroups;
    3670              : 
    3671        33777 :     if (node->aggstrategy != AGG_HASHED)
    3672              :     {
    3673        59358 :         for (i = 0; i < numGroupingSets; i++)
    3674              :         {
    3675        29970 :             pergroups[i] = palloc0_array(AggStatePerGroupData, numaggs);
    3676              :         }
    3677              : 
    3678        29388 :         aggstate->pergroups = pergroups;
    3679        29388 :         pergroups += numGroupingSets;
    3680              :     }
    3681              : 
    3682              :     /*
    3683              :      * Hashing can only appear in the initial phase.
    3684              :      */
    3685        33777 :     if (use_hashing)
    3686              :     {
    3687         4563 :         Plan       *outerplan = outerPlan(node);
    3688         4563 :         double      totalGroups = 0;
    3689              : 
    3690         4563 :         aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
    3691              :                                                             &TTSOpsMinimalTuple);
    3692         4563 :         aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
    3693              :                                                             &TTSOpsVirtual);
    3694              : 
    3695              :         /* this is an array of pointers, not structures */
    3696         4563 :         aggstate->hash_pergroup = pergroups;
    3697              : 
    3698         9126 :         aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
    3699         4563 :                                                       outerplan->plan_width,
    3700              :                                                       node->transitionSpace);
    3701              : 
    3702              :         /*
    3703              :          * Consider all of the grouping sets together when setting the limits
    3704              :          * and estimating the number of partitions. This can be inaccurate
    3705              :          * when there is more than one grouping set, but should still be
    3706              :          * reasonable.
    3707              :          */
    3708         9486 :         for (int k = 0; k < aggstate->num_hashes; k++)
    3709         4923 :             totalGroups += aggstate->perhash[k].aggnode->numGroups;
    3710              : 
    3711         4563 :         hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
    3712              :                             &aggstate->hash_mem_limit,
    3713              :                             &aggstate->hash_ngroups_limit,
    3714              :                             &aggstate->hash_planned_partitions);
    3715         4563 :         find_hash_columns(aggstate);
    3716              : 
    3717              :         /* Skip massive memory allocation if we are just doing EXPLAIN */
    3718         4563 :         if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
    3719         3313 :             build_hash_tables(aggstate);
    3720              : 
    3721         4563 :         aggstate->table_filled = false;
    3722              : 
    3723              :         /* Initialize this to 1, meaning nothing spilled, yet */
    3724         4563 :         aggstate->hash_batches_used = 1;
    3725              :     }
    3726              : 
    3727              :     /*
    3728              :      * Initialize current phase-dependent values to initial phase. The initial
    3729              :      * phase is 1 (first sort pass) for all strategies that use sorting (if
    3730              :      * hashing is being done too, then phase 0 is processed last); but if only
    3731              :      * hashing is being done, then phase 0 is all there is.
    3732              :      */
    3733        33777 :     if (node->aggstrategy == AGG_HASHED)
    3734              :     {
    3735         4389 :         aggstate->current_phase = 0;
    3736         4389 :         initialize_phase(aggstate, 0);
    3737         4389 :         select_current_set(aggstate, 0, true);
    3738              :     }
    3739              :     else
    3740              :     {
    3741        29388 :         aggstate->current_phase = 1;
    3742        29388 :         initialize_phase(aggstate, 1);
    3743        29388 :         select_current_set(aggstate, 0, false);
    3744              :     }
    3745              : 
    3746              :     /*
    3747              :      * Perform lookups of aggregate function info, and initialize the
    3748              :      * unchanging fields of the per-agg and per-trans data.
    3749              :      */
    3750        71889 :     foreach(l, aggstate->aggs)
    3751              :     {
    3752        38116 :         Aggref     *aggref = lfirst(l);
    3753              :         AggStatePerAgg peragg;
    3754              :         AggStatePerTrans pertrans;
    3755              :         Oid         aggTransFnInputTypes[FUNC_MAX_ARGS];
    3756              :         int         numAggTransFnArgs;
    3757              :         int         numDirectArgs;
    3758              :         HeapTuple   aggTuple;
    3759              :         Form_pg_aggregate aggform;
    3760              :         AclResult   aclresult;
    3761              :         Oid         finalfn_oid;
    3762              :         Oid         serialfn_oid,
    3763              :                     deserialfn_oid;
    3764              :         Oid         aggOwner;
    3765              :         Expr       *finalfnexpr;
    3766              :         Oid         aggtranstype;
    3767              : 
    3768              :         /* Planner should have assigned aggregate to correct level */
    3769              :         Assert(aggref->agglevelsup == 0);
    3770              :         /* ... and the split mode should match */
    3771              :         Assert(aggref->aggsplit == aggstate->aggsplit);
    3772              : 
    3773        38116 :         peragg = &peraggs[aggref->aggno];
    3774              : 
    3775              :         /* Check if we initialized the state for this aggregate already. */
    3776        38116 :         if (peragg->aggref != NULL)
    3777          312 :             continue;
    3778              : 
    3779        37804 :         peragg->aggref = aggref;
    3780        37804 :         peragg->transno = aggref->aggtransno;
    3781              : 
    3782              :         /* Fetch the pg_aggregate row */
    3783        37804 :         aggTuple = SearchSysCache1(AGGFNOID,
    3784              :                                    ObjectIdGetDatum(aggref->aggfnoid));
    3785        37804 :         if (!HeapTupleIsValid(aggTuple))
    3786            0 :             elog(ERROR, "cache lookup failed for aggregate %u",
    3787              :                  aggref->aggfnoid);
    3788        37804 :         aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
    3789              : 
    3790              :         /* Check permission to call aggregate function */
    3791        37804 :         aclresult = object_aclcheck(ProcedureRelationId, aggref->aggfnoid, GetUserId(),
    3792              :                                     ACL_EXECUTE);
    3793        37804 :         if (aclresult != ACLCHECK_OK)
    3794            4 :             aclcheck_error(aclresult, OBJECT_AGGREGATE,
    3795            4 :                            get_func_name(aggref->aggfnoid));
    3796        37800 :         InvokeFunctionExecuteHook(aggref->aggfnoid);
    3797              : 
    3798              :         /* planner recorded transition state type in the Aggref itself */
    3799        37800 :         aggtranstype = aggref->aggtranstype;
    3800              :         Assert(OidIsValid(aggtranstype));
    3801              : 
    3802              :         /* Final function only required if we're finalizing the aggregates */
    3803        37800 :         if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
    3804         3695 :             peragg->finalfn_oid = finalfn_oid = InvalidOid;
    3805              :         else
    3806        34105 :             peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
    3807              : 
    3808        37800 :         serialfn_oid = InvalidOid;
    3809        37800 :         deserialfn_oid = InvalidOid;
    3810              : 
    3811              :         /*
    3812              :          * Check if serialization/deserialization is required.  We only do it
    3813              :          * for aggregates that have transtype INTERNAL.
    3814              :          */
    3815        37800 :         if (aggtranstype == INTERNALOID)
    3816              :         {
    3817              :             /*
    3818              :              * The planner should only have generated a serialize agg node if
    3819              :              * every aggregate with an INTERNAL state has a serialization
    3820              :              * function.  Verify that.
    3821              :              */
    3822        15377 :             if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
    3823              :             {
    3824              :                 /* serialization only valid when not running finalfn */
    3825              :                 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
    3826              : 
    3827          224 :                 if (!OidIsValid(aggform->aggserialfn))
    3828            0 :                     elog(ERROR, "serialfunc not provided for serialization aggregation");
    3829          224 :                 serialfn_oid = aggform->aggserialfn;
    3830              :             }
    3831              : 
    3832              :             /* Likewise for deserialization functions */
    3833        15377 :             if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
    3834              :             {
    3835              :                 /* deserialization only valid when combining states */
    3836              :                 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
    3837              : 
    3838           80 :                 if (!OidIsValid(aggform->aggdeserialfn))
    3839            0 :                     elog(ERROR, "deserialfunc not provided for deserialization aggregation");
    3840           80 :                 deserialfn_oid = aggform->aggdeserialfn;
    3841              :             }
    3842              :         }
    3843              : 
    3844              :         /* Check that aggregate owner has permission to call component fns */
    3845              :         {
    3846              :             HeapTuple   procTuple;
    3847              : 
    3848        37800 :             procTuple = SearchSysCache1(PROCOID,
    3849              :                                         ObjectIdGetDatum(aggref->aggfnoid));
    3850        37800 :             if (!HeapTupleIsValid(procTuple))
    3851            0 :                 elog(ERROR, "cache lookup failed for function %u",
    3852              :                      aggref->aggfnoid);
    3853        37800 :             aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
    3854        37800 :             ReleaseSysCache(procTuple);
    3855              : 
    3856        37800 :             if (OidIsValid(finalfn_oid))
    3857              :             {
    3858        16447 :                 aclresult = object_aclcheck(ProcedureRelationId, finalfn_oid, aggOwner,
    3859              :                                             ACL_EXECUTE);
    3860        16447 :                 if (aclresult != ACLCHECK_OK)
    3861            0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3862            0 :                                    get_func_name(finalfn_oid));
    3863        16447 :                 InvokeFunctionExecuteHook(finalfn_oid);
    3864              :             }
    3865        37800 :             if (OidIsValid(serialfn_oid))
    3866              :             {
    3867          224 :                 aclresult = object_aclcheck(ProcedureRelationId, serialfn_oid, aggOwner,
    3868              :                                             ACL_EXECUTE);
    3869          224 :                 if (aclresult != ACLCHECK_OK)
    3870            0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3871            0 :                                    get_func_name(serialfn_oid));
    3872          224 :                 InvokeFunctionExecuteHook(serialfn_oid);
    3873              :             }
    3874        37800 :             if (OidIsValid(deserialfn_oid))
    3875              :             {
    3876           80 :                 aclresult = object_aclcheck(ProcedureRelationId, deserialfn_oid, aggOwner,
    3877              :                                             ACL_EXECUTE);
    3878           80 :                 if (aclresult != ACLCHECK_OK)
    3879            0 :                     aclcheck_error(aclresult, OBJECT_FUNCTION,
    3880            0 :                                    get_func_name(deserialfn_oid));
    3881           80 :                 InvokeFunctionExecuteHook(deserialfn_oid);
    3882              :             }
    3883              :         }
    3884              : 
    3885              :         /*
    3886              :          * Get actual datatypes of the (nominal) aggregate inputs.  These
    3887              :          * could be different from the agg's declared input types, when the
    3888              :          * agg accepts ANY or a polymorphic type.
    3889              :          */
    3890        37800 :         numAggTransFnArgs = get_aggregate_argtypes(aggref,
    3891              :                                                    aggTransFnInputTypes);
    3892              : 
    3893              :         /* Count the "direct" arguments, if any */
    3894        37800 :         numDirectArgs = list_length(aggref->aggdirectargs);
    3895              : 
    3896              :         /* Detect how many arguments to pass to the finalfn */
    3897        37800 :         if (aggform->aggfinalextra)
    3898        11002 :             peragg->numFinalArgs = numAggTransFnArgs + 1;
    3899              :         else
    3900        26798 :             peragg->numFinalArgs = numDirectArgs + 1;
    3901              : 
    3902              :         /* Initialize any direct-argument expressions */
    3903        37800 :         peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
    3904              :                                                  (PlanState *) aggstate);
    3905              : 
    3906              :         /*
    3907              :          * build expression trees using actual argument & result types for the
    3908              :          * finalfn, if it exists and is required.
    3909              :          */
    3910        37800 :         if (OidIsValid(finalfn_oid))
    3911              :         {
    3912        16447 :             build_aggregate_finalfn_expr(aggTransFnInputTypes,
    3913              :                                          peragg->numFinalArgs,
    3914              :                                          aggtranstype,
    3915              :                                          aggref->aggtype,
    3916              :                                          aggref->inputcollid,
    3917              :                                          finalfn_oid,
    3918              :                                          &finalfnexpr);
    3919        16447 :             fmgr_info(finalfn_oid, &peragg->finalfn);
    3920        16447 :             fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
    3921              :         }
    3922              : 
    3923              :         /* get info about the output value's datatype */
    3924        37800 :         get_typlenbyval(aggref->aggtype,
    3925              :                         &peragg->resulttypeLen,
    3926              :                         &peragg->resulttypeByVal);
    3927              : 
    3928              :         /*
    3929              :          * Build working state for invoking the transition function, if we
    3930              :          * haven't done it already.
    3931              :          */
    3932        37800 :         pertrans = &pertransstates[aggref->aggtransno];
    3933        37800 :         if (pertrans->aggref == NULL)
    3934              :         {
    3935              :             Datum       textInitVal;
    3936              :             Datum       initValue;
    3937              :             bool        initValueIsNull;
    3938              :             Oid         transfn_oid;
    3939              : 
    3940              :             /*
    3941              :              * If this aggregation is performing state combines, then instead
    3942              :              * of using the transition function, we'll use the combine
    3943              :              * function.
    3944              :              */
    3945        37612 :             if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
    3946              :             {
    3947         1497 :                 transfn_oid = aggform->aggcombinefn;
    3948              : 
    3949              :                 /* If not set then the planner messed up */
    3950         1497 :                 if (!OidIsValid(transfn_oid))
    3951            0 :                     elog(ERROR, "combinefn not set for aggregate function");
    3952              :             }
    3953              :             else
    3954        36115 :                 transfn_oid = aggform->aggtransfn;
    3955              : 
    3956        37612 :             aclresult = object_aclcheck(ProcedureRelationId, transfn_oid, aggOwner, ACL_EXECUTE);
    3957        37612 :             if (aclresult != ACLCHECK_OK)
    3958            0 :                 aclcheck_error(aclresult, OBJECT_FUNCTION,
    3959            0 :                                get_func_name(transfn_oid));
    3960        37612 :             InvokeFunctionExecuteHook(transfn_oid);
    3961              : 
    3962              :             /*
    3963              :              * initval is potentially null, so don't try to access it as a
    3964              :              * struct field. Must do it the hard way with SysCacheGetAttr.
    3965              :              */
    3966        37612 :             textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
    3967              :                                           Anum_pg_aggregate_agginitval,
    3968              :                                           &initValueIsNull);
    3969        37612 :             if (initValueIsNull)
    3970        21115 :                 initValue = (Datum) 0;
    3971              :             else
    3972        16497 :                 initValue = GetAggInitVal(textInitVal, aggtranstype);
    3973              : 
    3974        37612 :             if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
    3975              :             {
    3976         1497 :                 Oid         combineFnInputTypes[] = {aggtranstype,
    3977              :                 aggtranstype};
    3978              : 
    3979              :                 /*
    3980              :                  * When combining there's only one input, the to-be-combined
    3981              :                  * transition value.  The transition value is not counted
    3982              :                  * here.
    3983              :                  */
    3984         1497 :                 pertrans->numTransInputs = 1;
    3985              : 
    3986              :                 /* aggcombinefn always has two arguments of aggtranstype */
    3987         1497 :                 build_pertrans_for_aggref(pertrans, aggstate, estate,
    3988              :                                           aggref, transfn_oid, aggtranstype,
    3989              :                                           serialfn_oid, deserialfn_oid,
    3990              :                                           initValue, initValueIsNull,
    3991              :                                           combineFnInputTypes, 2);
    3992              : 
    3993              :                 /*
    3994              :                  * Ensure that a combine function to combine INTERNAL states
    3995              :                  * is not strict. This should have been checked during CREATE
    3996              :                  * AGGREGATE, but the strict property could have been changed
    3997              :                  * since then.
    3998              :                  */
    3999         1497 :                 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
    4000            0 :                     ereport(ERROR,
    4001              :                             (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
    4002              :                              errmsg("combine function with transition type %s must not be declared STRICT",
    4003              :                                     format_type_be(aggtranstype))));
    4004              :             }
    4005              :             else
    4006              :             {
    4007              :                 /* Detect how many arguments to pass to the transfn */
    4008        36115 :                 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
    4009          167 :                     pertrans->numTransInputs = list_length(aggref->args);
    4010              :                 else
    4011        35948 :                     pertrans->numTransInputs = numAggTransFnArgs;
    4012              : 
    4013        36115 :                 build_pertrans_for_aggref(pertrans, aggstate, estate,
    4014              :                                           aggref, transfn_oid, aggtranstype,
    4015              :                                           serialfn_oid, deserialfn_oid,
    4016              :                                           initValue, initValueIsNull,
    4017              :                                           aggTransFnInputTypes,
    4018              :                                           numAggTransFnArgs);
    4019              : 
    4020              :                 /*
    4021              :                  * If the transfn is strict and the initval is NULL, make sure
    4022              :                  * input type and transtype are the same (or at least
    4023              :                  * binary-compatible), so that it's OK to use the first
    4024              :                  * aggregated input value as the initial transValue.  This
    4025              :                  * should have been checked at agg definition time, but we
    4026              :                  * must check again in case the transfn's strictness property
    4027              :                  * has been changed.
    4028              :                  */
    4029        36115 :                 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
    4030              :                 {
    4031         3029 :                     if (numAggTransFnArgs <= numDirectArgs ||
    4032         3029 :                         !IsBinaryCoercible(aggTransFnInputTypes[numDirectArgs],
    4033              :                                            aggtranstype))
    4034            0 :                         ereport(ERROR,
    4035              :                                 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
    4036              :                                  errmsg("aggregate %u needs to have compatible input type and transition type",
    4037              :                                         aggref->aggfnoid)));
    4038              :                 }
    4039              :             }
    4040              :         }
    4041              :         else
    4042          188 :             pertrans->aggshared = true;
    4043        37800 :         ReleaseSysCache(aggTuple);
    4044              :     }
    4045              : 
    4046              :     /*
    4047              :      * Last, check whether any more aggregates got added onto the node while
    4048              :      * we processed the expressions for the aggregate arguments (including not
    4049              :      * only the regular arguments and FILTER expressions handled immediately
    4050              :      * above, but any direct arguments we might've handled earlier).  If so,
    4051              :      * we have nested aggregate functions, which is semantically nonsensical,
    4052              :      * so complain.  (This should have been caught by the parser, so we don't
    4053              :      * need to work hard on a helpful error message; but we defend against it
    4054              :      * here anyway, just to be sure.)
    4055              :      */
    4056        33773 :     if (numaggrefs != list_length(aggstate->aggs))
    4057            0 :         ereport(ERROR,
    4058              :                 (errcode(ERRCODE_GROUPING_ERROR),
    4059              :                  errmsg("aggregate function calls cannot be nested")));
    4060              : 
    4061              :     /*
    4062              :      * Build expressions doing all the transition work at once. We build a
    4063              :      * different one for each phase, as the number of transition function
    4064              :      * invocation can differ between phases. Note this'll work both for
    4065              :      * transition and combination functions (although there'll only be one
    4066              :      * phase in the latter case).
    4067              :      */
    4068        97122 :     for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
    4069              :     {
    4070        63349 :         AggStatePerPhase phase = &aggstate->phases[phaseidx];
    4071        63349 :         bool        dohash = false;
    4072        63349 :         bool        dosort = false;
    4073              : 
    4074              :         /* phase 0 doesn't necessarily exist */
    4075        63349 :         if (!phase->aggnode)
    4076        29210 :             continue;
    4077              : 
    4078        34139 :         if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
    4079              :         {
    4080              :             /*
    4081              :              * Phase one, and only phase one, in a mixed agg performs both
    4082              :              * sorting and aggregation.
    4083              :              */
    4084          174 :             dohash = true;
    4085          174 :             dosort = true;
    4086              :         }
    4087        33965 :         else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
    4088              :         {
    4089              :             /*
    4090              :              * No need to compute a transition function for an AGG_MIXED phase
    4091              :              * 0 - the contents of the hashtables will have been computed
    4092              :              * during phase 1.
    4093              :              */
    4094          174 :             continue;
    4095              :         }
    4096        33791 :         else if (phase->aggstrategy == AGG_PLAIN ||
    4097         6198 :                  phase->aggstrategy == AGG_SORTED)
    4098              :         {
    4099        29402 :             dohash = false;
    4100        29402 :             dosort = true;
    4101              :         }
    4102         4389 :         else if (phase->aggstrategy == AGG_HASHED)
    4103              :         {
    4104         4389 :             dohash = true;
    4105         4389 :             dosort = false;
    4106              :         }
    4107              :         else
    4108              :             Assert(false);
    4109              : 
    4110        33965 :         phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
    4111              :                                              false);
    4112              : 
    4113              :         /* cache compiled expression for outer slot without NULL check */
    4114        33965 :         phase->evaltrans_cache[0][0] = phase->evaltrans;
    4115              :     }
    4116              : 
    4117        33773 :     return aggstate;
    4118              : }
    4119              : 
    4120              : /*
    4121              :  * Build the state needed to calculate a state value for an aggregate.
    4122              :  *
    4123              :  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
    4124              :  * to initialize the state for. 'transfn_oid', 'aggtranstype', and the rest
    4125              :  * of the arguments could be calculated from 'aggref', but the caller has
    4126              :  * calculated them already, so might as well pass them.
    4127              :  *
    4128              :  * 'transfn_oid' may be either the Oid of the aggtransfn or the aggcombinefn.
    4129              :  */
    4130              : static void
    4131        37612 : build_pertrans_for_aggref(AggStatePerTrans pertrans,
    4132              :                           AggState *aggstate, EState *estate,
    4133              :                           Aggref *aggref,
    4134              :                           Oid transfn_oid, Oid aggtranstype,
    4135              :                           Oid aggserialfn, Oid aggdeserialfn,
    4136              :                           Datum initValue, bool initValueIsNull,
    4137              :                           Oid *inputTypes, int numArguments)
    4138              : {
    4139        37612 :     int         numGroupingSets = Max(aggstate->maxsets, 1);
    4140              :     Expr       *transfnexpr;
    4141              :     int         numTransArgs;
    4142        37612 :     Expr       *serialfnexpr = NULL;
    4143        37612 :     Expr       *deserialfnexpr = NULL;
    4144              :     ListCell   *lc;
    4145              :     int         numInputs;
    4146              :     int         numDirectArgs;
    4147              :     List       *sortlist;
    4148              :     int         numSortCols;
    4149              :     int         numDistinctCols;
    4150              :     int         i;
    4151              : 
    4152              :     /* Begin filling in the pertrans data */
    4153        37612 :     pertrans->aggref = aggref;
    4154        37612 :     pertrans->aggshared = false;
    4155        37612 :     pertrans->aggCollation = aggref->inputcollid;
    4156        37612 :     pertrans->transfn_oid = transfn_oid;
    4157        37612 :     pertrans->serialfn_oid = aggserialfn;
    4158        37612 :     pertrans->deserialfn_oid = aggdeserialfn;
    4159        37612 :     pertrans->initValue = initValue;
    4160        37612 :     pertrans->initValueIsNull = initValueIsNull;
    4161              : 
    4162              :     /* Count the "direct" arguments, if any */
    4163        37612 :     numDirectArgs = list_length(aggref->aggdirectargs);
    4164              : 
    4165              :     /* Count the number of aggregated input columns */
    4166        37612 :     pertrans->numInputs = numInputs = list_length(aggref->args);
    4167              : 
    4168        37612 :     pertrans->aggtranstype = aggtranstype;
    4169              : 
    4170              :     /* account for the current transition state */
    4171        37612 :     numTransArgs = pertrans->numTransInputs + 1;
    4172              : 
    4173              :     /*
    4174              :      * Set up infrastructure for calling the transfn.  Note that invtransfn is
    4175              :      * not needed here.
    4176              :      */
    4177        37612 :     build_aggregate_transfn_expr(inputTypes,
    4178              :                                  numArguments,
    4179              :                                  numDirectArgs,
    4180        37612 :                                  aggref->aggvariadic,
    4181              :                                  aggtranstype,
    4182              :                                  aggref->inputcollid,
    4183              :                                  transfn_oid,
    4184              :                                  InvalidOid,
    4185              :                                  &transfnexpr,
    4186              :                                  NULL);
    4187              : 
    4188        37612 :     fmgr_info(transfn_oid, &pertrans->transfn);
    4189        37612 :     fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
    4190              : 
    4191        37612 :     pertrans->transfn_fcinfo =
    4192        37612 :         (FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
    4193        37612 :     InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
    4194              :                              &pertrans->transfn,
    4195              :                              numTransArgs,
    4196              :                              pertrans->aggCollation,
    4197              :                              (Node *) aggstate, NULL);
    4198              : 
    4199              :     /* get info about the state value's datatype */
    4200        37612 :     get_typlenbyval(aggtranstype,
    4201              :                     &pertrans->transtypeLen,
    4202              :                     &pertrans->transtypeByVal);
    4203              : 
    4204        37612 :     if (OidIsValid(aggserialfn))
    4205              :     {
    4206          224 :         build_aggregate_serialfn_expr(aggserialfn,
    4207              :                                       &serialfnexpr);
    4208          224 :         fmgr_info(aggserialfn, &pertrans->serialfn);
    4209          224 :         fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
    4210              : 
    4211          224 :         pertrans->serialfn_fcinfo =
    4212          224 :             (FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
    4213          224 :         InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
    4214              :                                  &pertrans->serialfn,
    4215              :                                  1,
    4216              :                                  InvalidOid,
    4217              :                                  (Node *) aggstate, NULL);
    4218              :     }
    4219              : 
    4220        37612 :     if (OidIsValid(aggdeserialfn))
    4221              :     {
    4222           80 :         build_aggregate_deserialfn_expr(aggdeserialfn,
    4223              :                                         &deserialfnexpr);
    4224           80 :         fmgr_info(aggdeserialfn, &pertrans->deserialfn);
    4225           80 :         fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
    4226              : 
    4227           80 :         pertrans->deserialfn_fcinfo =
    4228           80 :             (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
    4229           80 :         InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
    4230              :                                  &pertrans->deserialfn,
    4231              :                                  2,
    4232              :                                  InvalidOid,
    4233              :                                  (Node *) aggstate, NULL);
    4234              :     }
    4235              : 
    4236              :     /*
    4237              :      * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
    4238              :      * have a list of SortGroupClause nodes; fish out the data in them and
    4239              :      * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
    4240              :      * however; the agg's transfn and finalfn are responsible for that.
    4241              :      *
    4242              :      * When the planner has set the aggpresorted flag, the input to the
    4243              :      * aggregate is already correctly sorted.  For ORDER BY aggregates we can
    4244              :      * simply treat these as normal aggregates.  For presorted DISTINCT
    4245              :      * aggregates an extra step must be added to remove duplicate consecutive
    4246              :      * inputs.
    4247              :      *
    4248              :      * Note that by construction, if there is a DISTINCT clause then the ORDER
    4249              :      * BY clause is a prefix of it (see transformDistinctClause).
    4250              :      */
    4251        37612 :     if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
    4252              :     {
    4253          167 :         sortlist = NIL;
    4254          167 :         numSortCols = numDistinctCols = 0;
    4255          167 :         pertrans->aggsortrequired = false;
    4256              :     }
    4257        37445 :     else if (aggref->aggpresorted && aggref->aggdistinct == NIL)
    4258              :     {
    4259         1441 :         sortlist = NIL;
    4260         1441 :         numSortCols = numDistinctCols = 0;
    4261         1441 :         pertrans->aggsortrequired = false;
    4262              :     }
    4263        36004 :     else if (aggref->aggdistinct)
    4264              :     {
    4265          389 :         sortlist = aggref->aggdistinct;
    4266          389 :         numSortCols = numDistinctCols = list_length(sortlist);
    4267              :         Assert(numSortCols >= list_length(aggref->aggorder));
    4268          389 :         pertrans->aggsortrequired = !aggref->aggpresorted;
    4269              :     }
    4270              :     else
    4271              :     {
    4272        35615 :         sortlist = aggref->aggorder;
    4273        35615 :         numSortCols = list_length(sortlist);
    4274        35615 :         numDistinctCols = 0;
    4275        35615 :         pertrans->aggsortrequired = (numSortCols > 0);
    4276              :     }
    4277              : 
    4278        37612 :     pertrans->numSortCols = numSortCols;
    4279        37612 :     pertrans->numDistinctCols = numDistinctCols;
    4280              : 
    4281              :     /*
    4282              :      * If we have either sorting or filtering to do, create a tupledesc and
    4283              :      * slot corresponding to the aggregated inputs (including sort
    4284              :      * expressions) of the agg.
    4285              :      */
    4286        37612 :     if (numSortCols > 0 || aggref->aggfilter)
    4287              :     {
    4288          872 :         pertrans->sortdesc = ExecTypeFromTL(aggref->args);
    4289          872 :         pertrans->sortslot =
    4290          872 :             ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
    4291              :                                    &TTSOpsMinimalTuple);
    4292              :     }
    4293              : 
    4294        37612 :     if (numSortCols > 0)
    4295              :     {
    4296              :         /*
    4297              :          * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
    4298              :          * (yet)
    4299              :          */
    4300              :         Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
    4301              : 
    4302              :         /* ORDER BY aggregates are not supported with partial aggregation */
    4303              :         Assert(!DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
    4304              : 
    4305              :         /* If we have only one input, we need its len/byval info. */
    4306          481 :         if (numInputs == 1)
    4307              :         {
    4308          381 :             get_typlenbyval(inputTypes[numDirectArgs],
    4309              :                             &pertrans->inputtypeLen,
    4310              :                             &pertrans->inputtypeByVal);
    4311              :         }
    4312          100 :         else if (numDistinctCols > 0)
    4313              :         {
    4314              :             /* we will need an extra slot to store prior values */
    4315           72 :             pertrans->uniqslot =
    4316           72 :                 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
    4317              :                                        &TTSOpsMinimalTuple);
    4318              :         }
    4319              : 
    4320              :         /* Extract the sort information for use later */
    4321          481 :         pertrans->sortColIdx =
    4322          481 :             (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
    4323          481 :         pertrans->sortOperators =
    4324          481 :             (Oid *) palloc(numSortCols * sizeof(Oid));
    4325          481 :         pertrans->sortCollations =
    4326          481 :             (Oid *) palloc(numSortCols * sizeof(Oid));
    4327          481 :         pertrans->sortNullsFirst =
    4328          481 :             (bool *) palloc(numSortCols * sizeof(bool));
    4329              : 
    4330          481 :         i = 0;
    4331         1094 :         foreach(lc, sortlist)
    4332              :         {
    4333          613 :             SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
    4334          613 :             TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
    4335              : 
    4336              :             /* the parser should have made sure of this */
    4337              :             Assert(OidIsValid(sortcl->sortop));
    4338              : 
    4339          613 :             pertrans->sortColIdx[i] = tle->resno;
    4340          613 :             pertrans->sortOperators[i] = sortcl->sortop;
    4341          613 :             pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
    4342          613 :             pertrans->sortNullsFirst[i] = sortcl->nulls_first;
    4343          613 :             i++;
    4344              :         }
    4345              :         Assert(i == numSortCols);
    4346              :     }
    4347              : 
    4348        37612 :     if (aggref->aggdistinct)
    4349              :     {
    4350              :         Oid        *ops;
    4351              : 
    4352              :         Assert(numArguments > 0);
    4353              :         Assert(list_length(aggref->aggdistinct) == numDistinctCols);
    4354              : 
    4355          389 :         ops = palloc(numDistinctCols * sizeof(Oid));
    4356              : 
    4357          389 :         i = 0;
    4358          898 :         foreach(lc, aggref->aggdistinct)
    4359          509 :             ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
    4360              : 
    4361              :         /* lookup / build the necessary comparators */
    4362          389 :         if (numDistinctCols == 1)
    4363          317 :             fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
    4364              :         else
    4365           72 :             pertrans->equalfnMulti =
    4366           72 :                 execTuplesMatchPrepare(pertrans->sortdesc,
    4367              :                                        numDistinctCols,
    4368           72 :                                        pertrans->sortColIdx,
    4369              :                                        ops,
    4370           72 :                                        pertrans->sortCollations,
    4371              :                                        &aggstate->ss.ps);
    4372          389 :         pfree(ops);
    4373              :     }
    4374              : 
    4375        37612 :     pertrans->sortstates = palloc0_array(Tuplesortstate *, numGroupingSets);
    4376        37612 : }
    4377              : 
    4378              : 
    4379              : static Datum
    4380        16497 : GetAggInitVal(Datum textInitVal, Oid transtype)
    4381              : {
    4382              :     Oid         typinput,
    4383              :                 typioparam;
    4384              :     char       *strInitVal;
    4385              :     Datum       initVal;
    4386              : 
    4387        16497 :     getTypeInputInfo(transtype, &typinput, &typioparam);
    4388        16497 :     strInitVal = TextDatumGetCString(textInitVal);
    4389        16497 :     initVal = OidInputFunctionCall(typinput, strInitVal,
    4390              :                                    typioparam, -1);
    4391        16497 :     pfree(strInitVal);
    4392        16497 :     return initVal;
    4393              : }
    4394              : 
    4395              : void
    4396        33652 : ExecEndAgg(AggState *node)
    4397              : {
    4398              :     PlanState  *outerPlan;
    4399              :     int         transno;
    4400        33652 :     int         numGroupingSets = Max(node->maxsets, 1);
    4401              :     int         setno;
    4402              : 
    4403              :     /*
    4404              :      * When ending a parallel worker, copy the statistics gathered by the
    4405              :      * worker back into shared memory so that it can be picked up by the main
    4406              :      * process to report in EXPLAIN ANALYZE.
    4407              :      */
    4408        33652 :     if (node->shared_info && IsParallelWorker())
    4409              :     {
    4410              :         AggregateInstrumentation *si;
    4411              : 
    4412              :         Assert(ParallelWorkerNumber < node->shared_info->num_workers);
    4413          111 :         si = &node->shared_info->sinstrument[ParallelWorkerNumber];
    4414          111 :         si->hash_batches_used = node->hash_batches_used;
    4415          111 :         si->hash_disk_used = node->hash_disk_used;
    4416          111 :         si->hash_mem_peak = node->hash_mem_peak;
    4417              :     }
    4418              : 
    4419              :     /* Make sure we have closed any open tuplesorts */
    4420              : 
    4421        33652 :     if (node->sort_in)
    4422          120 :         tuplesort_end(node->sort_in);
    4423        33652 :     if (node->sort_out)
    4424           44 :         tuplesort_end(node->sort_out);
    4425              : 
    4426        33652 :     hashagg_reset_spill_state(node);
    4427              : 
    4428              :     /* Release hash tables too */
    4429        33652 :     if (node->hash_metacxt != NULL)
    4430              :     {
    4431         4558 :         MemoryContextDelete(node->hash_metacxt);
    4432         4558 :         node->hash_metacxt = NULL;
    4433              :     }
    4434        33652 :     if (node->hash_tuplescxt != NULL)
    4435              :     {
    4436         4558 :         MemoryContextDelete(node->hash_tuplescxt);
    4437         4558 :         node->hash_tuplescxt = NULL;
    4438              :     }
    4439              : 
    4440        71141 :     for (transno = 0; transno < node->numtrans; transno++)
    4441              :     {
    4442        37489 :         AggStatePerTrans pertrans = &node->pertrans[transno];
    4443              : 
    4444        75686 :         for (setno = 0; setno < numGroupingSets; setno++)
    4445              :         {
    4446        38197 :             if (pertrans->sortstates[setno])
    4447            0 :                 tuplesort_end(pertrans->sortstates[setno]);
    4448              :         }
    4449              :     }
    4450              : 
    4451              :     /* And ensure any agg shutdown callbacks have been called */
    4452        67886 :     for (setno = 0; setno < numGroupingSets; setno++)
    4453        34234 :         ReScanExprContext(node->aggcontexts[setno]);
    4454        33652 :     if (node->hashcontext)
    4455         4558 :         ReScanExprContext(node->hashcontext);
    4456              : 
    4457        33652 :     outerPlan = outerPlanState(node);
    4458        33652 :     ExecEndNode(outerPlan);
    4459        33652 : }
    4460              : 
    4461              : void
    4462        29484 : ExecReScanAgg(AggState *node)
    4463              : {
    4464        29484 :     ExprContext *econtext = node->ss.ps.ps_ExprContext;
    4465        29484 :     PlanState  *outerPlan = outerPlanState(node);
    4466        29484 :     Agg        *aggnode = (Agg *) node->ss.ps.plan;
    4467              :     int         transno;
    4468        29484 :     int         numGroupingSets = Max(node->maxsets, 1);
    4469              :     int         setno;
    4470              : 
    4471        29484 :     node->agg_done = false;
    4472              : 
    4473        29484 :     if (node->aggstrategy == AGG_HASHED)
    4474              :     {
    4475              :         /*
    4476              :          * In the hashed case, if we haven't yet built the hash table then we
    4477              :          * can just return; nothing done yet, so nothing to undo. If subnode's
    4478              :          * chgParam is not NULL then it will be re-scanned by ExecProcNode,
    4479              :          * else no reason to re-scan it at all.
    4480              :          */
    4481         7341 :         if (!node->table_filled)
    4482           83 :             return;
    4483              : 
    4484              :         /*
    4485              :          * If we do have the hash table, and it never spilled, and the subplan
    4486              :          * does not have any parameter changes, and none of our own parameter
    4487              :          * changes affect input expressions of the aggregated functions, then
    4488              :          * we can just rescan the existing hash table; no need to build it
    4489              :          * again.
    4490              :          */
    4491         7258 :         if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
    4492          606 :             !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
    4493              :         {
    4494          590 :             ResetTupleHashIterator(node->perhash[0].hashtable,
    4495              :                                    &node->perhash[0].hashiter);
    4496          590 :             select_current_set(node, 0, true);
    4497          590 :             return;
    4498              :         }
    4499              :     }
    4500              : 
    4501              :     /* Make sure we have closed any open tuplesorts */
    4502        64684 :     for (transno = 0; transno < node->numtrans; transno++)
    4503              :     {
    4504        71770 :         for (setno = 0; setno < numGroupingSets; setno++)
    4505              :         {
    4506        35897 :             AggStatePerTrans pertrans = &node->pertrans[transno];
    4507              : 
    4508        35897 :             if (pertrans->sortstates[setno])
    4509              :             {
    4510            0 :                 tuplesort_end(pertrans->sortstates[setno]);
    4511            0 :                 pertrans->sortstates[setno] = NULL;
    4512              :             }
    4513              :         }
    4514              :     }
    4515              : 
    4516              :     /*
    4517              :      * We don't need to ReScanExprContext the output tuple context here;
    4518              :      * ExecReScan already did it. But we do need to reset our per-grouping-set
    4519              :      * contexts, which may have transvalues stored in them. (We use rescan
    4520              :      * rather than just reset because transfns may have registered callbacks
    4521              :      * that need to be run now.) For the AGG_HASHED case, see below.
    4522              :      */
    4523              : 
    4524        57646 :     for (setno = 0; setno < numGroupingSets; setno++)
    4525              :     {
    4526        28835 :         ReScanExprContext(node->aggcontexts[setno]);
    4527              :     }
    4528              : 
    4529              :     /* Release first tuple of group, if we have made a copy */
    4530        28811 :     if (node->grp_firstTuple != NULL)
    4531              :     {
    4532            0 :         heap_freetuple(node->grp_firstTuple);
    4533            0 :         node->grp_firstTuple = NULL;
    4534              :     }
    4535        28811 :     ExecClearTuple(node->ss.ss_ScanTupleSlot);
    4536              : 
    4537              :     /* Forget current agg values */
    4538        64684 :     MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
    4539        28811 :     MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
    4540              : 
    4541              :     /*
    4542              :      * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
    4543              :      * the hashcontext. This used to be an issue, but now, resetting a context
    4544              :      * automatically deletes sub-contexts too.
    4545              :      */
    4546        28811 :     if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
    4547              :     {
    4548         6688 :         hashagg_reset_spill_state(node);
    4549              : 
    4550         6688 :         node->hash_ever_spilled = false;
    4551         6688 :         node->hash_spill_mode = false;
    4552         6688 :         node->hash_ngroups_current = 0;
    4553              : 
    4554         6688 :         ReScanExprContext(node->hashcontext);
    4555              :         /* Rebuild empty hash table(s) */
    4556         6688 :         build_hash_tables(node);
    4557         6688 :         node->table_filled = false;
    4558              :         /* iterator will be reset when the table is filled */
    4559              : 
    4560         6688 :         hashagg_recompile_expressions(node, false, false);
    4561              :     }
    4562              : 
    4563        28811 :     if (node->aggstrategy != AGG_HASHED)
    4564              :     {
    4565              :         /*
    4566              :          * Reset the per-group state (in particular, mark transvalues null)
    4567              :          */
    4568        44310 :         for (setno = 0; setno < numGroupingSets; setno++)
    4569              :         {
    4570        93897 :             MemSet(node->pergroups[setno], 0,
    4571              :                    sizeof(AggStatePerGroupData) * node->numaggs);
    4572              :         }
    4573              : 
    4574              :         /* reset to phase 1 */
    4575        22143 :         initialize_phase(node, 1);
    4576              : 
    4577        22143 :         node->input_done = false;
    4578        22143 :         node->projected_set = -1;
    4579              :     }
    4580              : 
    4581        28811 :     if (outerPlan->chgParam == NULL)
    4582          125 :         ExecReScan(outerPlan);
    4583              : }
    4584              : 
    4585              : 
    4586              : /***********************************************************************
    4587              :  * API exposed to aggregate functions
    4588              :  ***********************************************************************/
    4589              : 
    4590              : 
    4591              : /*
    4592              :  * AggCheckCallContext - test if a SQL function is being called as an aggregate
    4593              :  *
    4594              :  * The transition and/or final functions of an aggregate may want to verify
    4595              :  * that they are being called as aggregates, rather than as plain SQL
    4596              :  * functions.  They should use this function to do so.  The return value
    4597              :  * is nonzero if being called as an aggregate, or zero if not.  (Specific
    4598              :  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
    4599              :  * values could conceivably appear in future.)
    4600              :  *
    4601              :  * If aggcontext isn't NULL, the function also stores at *aggcontext the
    4602              :  * identity of the memory context that aggregate transition values are being
    4603              :  * stored in.  Note that the same aggregate call site (flinfo) may be called
    4604              :  * interleaved on different transition values in different contexts, so it's
    4605              :  * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
    4606              :  * cache it in the transvalue itself (for internal-type transvalues).
    4607              :  */
    4608              : int
    4609      3429099 : AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
    4610              : {
    4611      3429099 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4612              :     {
    4613      3421415 :         if (aggcontext)
    4614              :         {
    4615      1643128 :             AggState   *aggstate = ((AggState *) fcinfo->context);
    4616      1643128 :             ExprContext *cxt = aggstate->curaggcontext;
    4617              : 
    4618      1643128 :             *aggcontext = cxt->ecxt_per_tuple_memory;
    4619              :         }
    4620      3421415 :         return AGG_CONTEXT_AGGREGATE;
    4621              :     }
    4622         7684 :     if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
    4623              :     {
    4624         6408 :         if (aggcontext)
    4625          540 :             *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
    4626         6408 :         return AGG_CONTEXT_WINDOW;
    4627              :     }
    4628              : 
    4629              :     /* this is just to prevent "uninitialized variable" warnings */
    4630         1276 :     if (aggcontext)
    4631         1236 :         *aggcontext = NULL;
    4632         1276 :     return 0;
    4633              : }
    4634              : 
    4635              : /*
    4636              :  * AggGetAggref - allow an aggregate support function to get its Aggref
    4637              :  *
    4638              :  * If the function is being called as an aggregate support function,
    4639              :  * return the Aggref node for the aggregate call.  Otherwise, return NULL.
    4640              :  *
    4641              :  * Aggregates sharing the same inputs and transition functions can get
    4642              :  * merged into a single transition calculation.  If the transition function
    4643              :  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
    4644              :  * executing.  It must therefore not pay attention to the Aggref fields that
    4645              :  * relate to the final function, as those are indeterminate.  But if a final
    4646              :  * function calls AggGetAggref, it will get a precise result.
    4647              :  *
    4648              :  * Note that if an aggregate is being used as a window function, this will
    4649              :  * return NULL.  We could provide a similar function to return the relevant
    4650              :  * WindowFunc node in such cases, but it's not needed yet.
    4651              :  */
    4652              : Aggref *
    4653          163 : AggGetAggref(FunctionCallInfo fcinfo)
    4654              : {
    4655          163 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4656              :     {
    4657          163 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4658              :         AggStatePerAgg curperagg;
    4659              :         AggStatePerTrans curpertrans;
    4660              : 
    4661              :         /* check curperagg (valid when in a final function) */
    4662          163 :         curperagg = aggstate->curperagg;
    4663              : 
    4664          163 :         if (curperagg)
    4665            0 :             return curperagg->aggref;
    4666              : 
    4667              :         /* check curpertrans (valid when in a transition function) */
    4668          163 :         curpertrans = aggstate->curpertrans;
    4669              : 
    4670          163 :         if (curpertrans)
    4671          163 :             return curpertrans->aggref;
    4672              :     }
    4673            0 :     return NULL;
    4674              : }
    4675              : 
    4676              : /*
    4677              :  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
    4678              :  *
    4679              :  * This is useful in agg final functions; the context returned is one that
    4680              :  * the final function can safely reset as desired.  This isn't useful for
    4681              :  * transition functions, since the context returned MAY (we don't promise)
    4682              :  * be the same as the context those are called in.
    4683              :  *
    4684              :  * As above, this is currently not useful for aggs called as window functions.
    4685              :  */
    4686              : MemoryContext
    4687            0 : AggGetTempMemoryContext(FunctionCallInfo fcinfo)
    4688              : {
    4689            0 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4690              :     {
    4691            0 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4692              : 
    4693            0 :         return aggstate->tmpcontext->ecxt_per_tuple_memory;
    4694              :     }
    4695            0 :     return NULL;
    4696              : }
    4697              : 
    4698              : /*
    4699              :  * AggStateIsShared - find out whether transition state is shared
    4700              :  *
    4701              :  * If the function is being called as an aggregate support function,
    4702              :  * return true if the aggregate's transition state is shared across
    4703              :  * multiple aggregates, false if it is not.
    4704              :  *
    4705              :  * Returns true if not called as an aggregate support function.
    4706              :  * This is intended as a conservative answer, ie "no you'd better not
    4707              :  * scribble on your input".  In particular, will return true if the
    4708              :  * aggregate is being used as a window function, which is a scenario
    4709              :  * in which changing the transition state is a bad idea.  We might
    4710              :  * want to refine the behavior for the window case in future.
    4711              :  */
    4712              : bool
    4713          163 : AggStateIsShared(FunctionCallInfo fcinfo)
    4714              : {
    4715          163 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4716              :     {
    4717          163 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4718              :         AggStatePerAgg curperagg;
    4719              :         AggStatePerTrans curpertrans;
    4720              : 
    4721              :         /* check curperagg (valid when in a final function) */
    4722          163 :         curperagg = aggstate->curperagg;
    4723              : 
    4724          163 :         if (curperagg)
    4725            0 :             return aggstate->pertrans[curperagg->transno].aggshared;
    4726              : 
    4727              :         /* check curpertrans (valid when in a transition function) */
    4728          163 :         curpertrans = aggstate->curpertrans;
    4729              : 
    4730          163 :         if (curpertrans)
    4731          163 :             return curpertrans->aggshared;
    4732              :     }
    4733            0 :     return true;
    4734              : }
    4735              : 
    4736              : /*
    4737              :  * AggRegisterCallback - register a cleanup callback for an aggregate
    4738              :  *
    4739              :  * This is useful for aggs to register shutdown callbacks, which will ensure
    4740              :  * that non-memory resources are freed.  The callback will occur just before
    4741              :  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
    4742              :  * either between groups or as a result of rescanning the query.  The callback
    4743              :  * will NOT be called on error paths.  The typical use-case is for freeing of
    4744              :  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
    4745              :  * created by the agg functions.  (The callback will not be called until after
    4746              :  * the result of the finalfn is no longer needed, so it's safe for the finalfn
    4747              :  * to return data that will be freed by the callback.)
    4748              :  *
    4749              :  * As above, this is currently not useful for aggs called as window functions.
    4750              :  */
    4751              : void
    4752          433 : AggRegisterCallback(FunctionCallInfo fcinfo,
    4753              :                     ExprContextCallbackFunction func,
    4754              :                     Datum arg)
    4755              : {
    4756          433 :     if (fcinfo->context && IsA(fcinfo->context, AggState))
    4757              :     {
    4758          433 :         AggState   *aggstate = (AggState *) fcinfo->context;
    4759          433 :         ExprContext *cxt = aggstate->curaggcontext;
    4760              : 
    4761          433 :         RegisterExprContextCallback(cxt, func, arg);
    4762              : 
    4763          433 :         return;
    4764              :     }
    4765            0 :     elog(ERROR, "aggregate function cannot register a callback in this context");
    4766              : }
    4767              : 
    4768              : 
    4769              : /* ----------------------------------------------------------------
    4770              :  *                      Parallel Query Support
    4771              :  * ----------------------------------------------------------------
    4772              :  */
    4773              : 
    4774              :  /* ----------------------------------------------------------------
    4775              :   *     ExecAggEstimate
    4776              :   *
    4777              :   *     Estimate space required to propagate aggregate statistics.
    4778              :   * ----------------------------------------------------------------
    4779              :   */
    4780              : void
    4781          391 : ExecAggEstimate(AggState *node, ParallelContext *pcxt)
    4782              : {
    4783              :     Size        size;
    4784              : 
    4785              :     /* don't need this if not instrumenting or no workers */
    4786          391 :     if (!node->ss.ps.instrument || pcxt->nworkers == 0)
    4787          323 :         return;
    4788              : 
    4789           68 :     size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
    4790           68 :     size = add_size(size, offsetof(SharedAggInfo, sinstrument));
    4791           68 :     shm_toc_estimate_chunk(&pcxt->estimator, size);
    4792           68 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
    4793              : }
    4794              : 
    4795              : /* ----------------------------------------------------------------
    4796              :  *      ExecAggInitializeDSM
    4797              :  *
    4798              :  *      Initialize DSM space for aggregate statistics.
    4799              :  * ----------------------------------------------------------------
    4800              :  */
    4801              : void
    4802          391 : ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
    4803              : {
    4804              :     Size        size;
    4805              : 
    4806              :     /* don't need this if not instrumenting or no workers */
    4807          391 :     if (!node->ss.ps.instrument || pcxt->nworkers == 0)
    4808          323 :         return;
    4809              : 
    4810           68 :     size = offsetof(SharedAggInfo, sinstrument)
    4811           68 :         + pcxt->nworkers * sizeof(AggregateInstrumentation);
    4812           68 :     node->shared_info = shm_toc_allocate(pcxt->toc, size);
    4813              :     /* ensure any unfilled slots will contain zeroes */
    4814           68 :     memset(node->shared_info, 0, size);
    4815           68 :     node->shared_info->num_workers = pcxt->nworkers;
    4816           68 :     shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
    4817           68 :                    node->shared_info);
    4818              : }
    4819              : 
    4820              : /* ----------------------------------------------------------------
    4821              :  *      ExecAggInitializeWorker
    4822              :  *
    4823              :  *      Attach worker to DSM space for aggregate statistics.
    4824              :  * ----------------------------------------------------------------
    4825              :  */
    4826              : void
    4827         1102 : ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
    4828              : {
    4829         1102 :     node->shared_info =
    4830         1102 :         shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
    4831         1102 : }
    4832              : 
    4833              : /* ----------------------------------------------------------------
    4834              :  *      ExecAggRetrieveInstrumentation
    4835              :  *
    4836              :  *      Transfer aggregate statistics from DSM to private memory.
    4837              :  * ----------------------------------------------------------------
    4838              :  */
    4839              : void
    4840           68 : ExecAggRetrieveInstrumentation(AggState *node)
    4841              : {
    4842              :     Size        size;
    4843              :     SharedAggInfo *si;
    4844              : 
    4845           68 :     if (node->shared_info == NULL)
    4846            0 :         return;
    4847              : 
    4848           68 :     size = offsetof(SharedAggInfo, sinstrument)
    4849           68 :         + node->shared_info->num_workers * sizeof(AggregateInstrumentation);
    4850           68 :     si = palloc(size);
    4851           68 :     memcpy(si, node->shared_info, size);
    4852           68 :     node->shared_info = si;
    4853              : }
        

Generated by: LCOV version 2.0-1