LCOV - code coverage report
Current view: top level - src/backend/utils/sort - tuplesort.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 91.5 % 902 825
Test Date: 2026-03-04 06:14:56 Functions: 100.0 % 57 57
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * tuplesort.c
       4              :  *    Generalized tuple sorting routines.
       5              :  *
       6              :  * This module provides a generalized facility for tuple sorting, which can be
       7              :  * applied to different kinds of sortable objects.  Implementation of
       8              :  * the particular sorting variants is given in tuplesortvariants.c.
       9              :  * This module works efficiently for both small and large amounts
      10              :  * of data.  Small amounts are sorted in-memory.  Large amounts are
      11              :  * sorted using temporary files and a standard external sort
      12              :  * algorithm.
      13              :  *
      14              :  * See Knuth, volume 3, for more than you want to know about external
      15              :  * sorting algorithms.  The algorithm we use is a balanced k-way merge.
      16              :  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
      17              :  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
      18              :  * merge is better.  Knuth is assuming that tape drives are expensive
      19              :  * beasts, and in particular that there will always be many more runs than
      20              :  * tape drives.  The polyphase merge algorithm was good at keeping all the
      21              :  * tape drives busy, but in our implementation a "tape drive" doesn't cost
      22              :  * much more than a few Kb of memory buffers, so we can afford to have
      23              :  * lots of them.  In particular, if we can have as many tape drives as
      24              :  * sorted runs, we can eliminate any repeated I/O at all.
      25              :  *
      26              :  * Historically, we divided the input into sorted runs using replacement
      27              :  * selection, in the form of a priority tree implemented as a heap
      28              :  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
      29              :  * or radix sort for run generation.
      30              :  *
      31              :  * The approximate amount of memory allowed for any one sort operation
      32              :  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
      33              :  * we absorb tuples and simply store them in an unsorted array as long as
      34              :  * we haven't exceeded workMem.  If we reach the end of the input without
      35              :  * exceeding workMem, we sort the array in memory and subsequently return
      36              :  * tuples just by scanning the tuple array sequentially.  If we do exceed
      37              :  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
      38              :  * When tuples are dumped in batch after in-memory sorting, we begin a new run
      39              :  * with a new output tape.  If we reach the max number of tapes, we write
      40              :  * subsequent runs on the existing tapes in a round-robin fashion.  We will
      41              :  * need multiple merge passes to finish the merge in that case.  After the
      42              :  * end of the input is reached, we dump out remaining tuples in memory into
      43              :  * a final run, then merge the runs.
      44              :  *
      45              :  * When merging runs, we use a heap containing just the frontmost tuple from
      46              :  * each source run; we repeatedly output the smallest tuple and replace it
      47              :  * with the next tuple from its source tape (if any).  When the heap empties,
      48              :  * the merge is complete.  The basic merge algorithm thus needs very little
      49              :  * memory --- only M tuples for an M-way merge, and M is constrained to a
      50              :  * small number.  However, we can still make good use of our full workMem
      51              :  * allocation by pre-reading additional blocks from each source tape.  Without
      52              :  * prereading, our access pattern to the temporary file would be very erratic;
      53              :  * on average we'd read one block from each of M source tapes during the same
      54              :  * time that we're writing M blocks to the output tape, so there is no
      55              :  * sequentiality of access at all, defeating the read-ahead methods used by
      56              :  * most Unix kernels.  Worse, the output tape gets written into a very random
      57              :  * sequence of blocks of the temp file, ensuring that things will be even
      58              :  * worse when it comes time to read that tape.  A straightforward merge pass
      59              :  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
      60              :  * by prereading from each source tape sequentially, loading about workMem/M
      61              :  * bytes from each tape in turn, and making the sequential blocks immediately
      62              :  * available for reuse.  This approach helps to localize both read and write
      63              :  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
      64              :  * much memory to use for the buffers.
      65              :  *
      66              :  * In the current code we determine the number of input tapes M on the basis
      67              :  * of workMem: we want workMem/M to be large enough that we read a fair
      68              :  * amount of data each time we read from a tape, so as to maintain the
      69              :  * locality of access described above.  Nonetheless, with large workMem we
      70              :  * can have many tapes.  The logical "tapes" are implemented by logtape.c,
      71              :  * which avoids space wastage by recycling disk space as soon as each block
      72              :  * is read from its "tape".
      73              :  *
      74              :  * When the caller requests random access to the sort result, we form
      75              :  * the final sorted run on a logical tape which is then "frozen", so
      76              :  * that we can access it randomly.  When the caller does not need random
      77              :  * access, we return from tuplesort_performsort() as soon as we are down
      78              :  * to one run per logical tape.  The final merge is then performed
      79              :  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
      80              :  * saves one cycle of writing all the data out to disk and reading it in.
      81              :  *
      82              :  * This module supports parallel sorting.  Parallel sorts involve coordination
      83              :  * among one or more worker processes, and a leader process, each with its own
      84              :  * tuplesort state.  The leader process (or, more accurately, the
      85              :  * Tuplesortstate associated with a leader process) creates a full tapeset
      86              :  * consisting of worker tapes with one run to merge; a run for every
      87              :  * worker process.  This is then merged.  Worker processes are guaranteed to
      88              :  * produce exactly one output run from their partial input.
      89              :  *
      90              :  *
      91              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      92              :  * Portions Copyright (c) 1994, Regents of the University of California
      93              :  *
      94              :  * IDENTIFICATION
      95              :  *    src/backend/utils/sort/tuplesort.c
      96              :  *
      97              :  *-------------------------------------------------------------------------
      98              :  */
      99              : 
     100              : #include "postgres.h"
     101              : 
     102              : #include <limits.h>
     103              : 
     104              : #include "commands/tablespace.h"
     105              : #include "miscadmin.h"
     106              : #include "pg_trace.h"
     107              : #include "storage/shmem.h"
     108              : #include "utils/guc.h"
     109              : #include "utils/memutils.h"
     110              : #include "utils/pg_rusage.h"
     111              : #include "utils/tuplesort.h"
     112              : 
     113              : /*
     114              :  * Initial size of memtuples array.  This must be more than
     115              :  * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples().  Clamp at
     116              :  * 1024 elements to avoid excessive reallocs.
     117              :  */
     118              : #define INITIAL_MEMTUPSIZE Max(1024, \
     119              :     ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
     120              : 
     121              : /* GUC variables */
     122              : bool        trace_sort = false;
     123              : 
     124              : #ifdef DEBUG_BOUNDED_SORT
     125              : bool        optimize_bounded_sort = true;
     126              : #endif
     127              : 
     128              : 
     129              : /*
     130              :  * During merge, we use a pre-allocated set of fixed-size slots to hold
     131              :  * tuples.  To avoid palloc/pfree overhead.
     132              :  *
     133              :  * Merge doesn't require a lot of memory, so we can afford to waste some,
     134              :  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
     135              :  * palloc() overhead is not significant anymore.
     136              :  *
     137              :  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
     138              :  * slot holds a tuple.
     139              :  */
     140              : #define SLAB_SLOT_SIZE 1024
     141              : 
     142              : typedef union SlabSlot
     143              : {
     144              :     union SlabSlot *nextfree;
     145              :     char        buffer[SLAB_SLOT_SIZE];
     146              : } SlabSlot;
     147              : 
     148              : /*
     149              :  * Possible states of a Tuplesort object.  These denote the states that
     150              :  * persist between calls of Tuplesort routines.
     151              :  */
     152              : typedef enum
     153              : {
     154              :     TSS_INITIAL,                /* Loading tuples; still within memory limit */
     155              :     TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
     156              :     TSS_BUILDRUNS,              /* Loading tuples; writing to tape */
     157              :     TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
     158              :     TSS_SORTEDONTAPE,           /* Sort completed, final run is on tape */
     159              :     TSS_FINALMERGE,             /* Performing final merge on-the-fly */
     160              : } TupSortStatus;
     161              : 
     162              : /*
     163              :  * Parameters for calculation of number of tapes to use --- see inittapes()
     164              :  * and tuplesort_merge_order().
     165              :  *
     166              :  * In this calculation we assume that each tape will cost us about 1 blocks
     167              :  * worth of buffer space.  This ignores the overhead of all the other data
     168              :  * structures needed for each tape, but it's probably close enough.
     169              :  *
     170              :  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
     171              :  * input tape, for pre-reading (see discussion at top of file).  This is *in
     172              :  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
     173              :  */
     174              : #define MINORDER        6       /* minimum merge order */
     175              : #define MAXORDER        500     /* maximum merge order */
     176              : #define TAPE_BUFFER_OVERHEAD        BLCKSZ
     177              : #define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
     178              : 
     179              : 
     180              : /*
     181              :  * Private state of a Tuplesort operation.
     182              :  */
     183              : struct Tuplesortstate
     184              : {
     185              :     TuplesortPublic base;
     186              :     TupSortStatus status;       /* enumerated value as shown above */
     187              :     bool        bounded;        /* did caller specify a maximum number of
     188              :                                  * tuples to return? */
     189              :     bool        boundUsed;      /* true if we made use of a bounded heap */
     190              :     int         bound;          /* if bounded, the maximum number of tuples */
     191              :     int64       tupleMem;       /* memory consumed by individual tuples.
     192              :                                  * storing this separately from what we track
     193              :                                  * in availMem allows us to subtract the
     194              :                                  * memory consumed by all tuples when dumping
     195              :                                  * tuples to tape */
     196              :     int64       availMem;       /* remaining memory available, in bytes */
     197              :     int64       allowedMem;     /* total memory allowed, in bytes */
     198              :     int         maxTapes;       /* max number of input tapes to merge in each
     199              :                                  * pass */
     200              :     int64       maxSpace;       /* maximum amount of space occupied among sort
     201              :                                  * of groups, either in-memory or on-disk */
     202              :     bool        isMaxSpaceDisk; /* true when maxSpace tracks on-disk space,
     203              :                                  * false means in-memory */
     204              :     TupSortStatus maxSpaceStatus;   /* sort status when maxSpace was reached */
     205              :     LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
     206              : 
     207              :     /*
     208              :      * This array holds the tuples now in sort memory.  If we are in state
     209              :      * INITIAL, the tuples are in no particular order; if we are in state
     210              :      * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     211              :      * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     212              :      * H.  In state SORTEDONTAPE, the array is not used.
     213              :      */
     214              :     SortTuple  *memtuples;      /* array of SortTuple structs */
     215              :     int         memtupcount;    /* number of tuples currently present */
     216              :     int         memtupsize;     /* allocated length of memtuples array */
     217              :     bool        growmemtuples;  /* memtuples' growth still underway? */
     218              : 
     219              :     /*
     220              :      * Memory for tuples is sometimes allocated using a simple slab allocator,
     221              :      * rather than with palloc().  Currently, we switch to slab allocation
     222              :      * when we start merging.  Merging only needs to keep a small, fixed
     223              :      * number of tuples in memory at any time, so we can avoid the
     224              :      * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     225              :      * to hold the tuples.
     226              :      *
     227              :      * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     228              :      * slots.  The allocation is sized to have one slot per tape, plus one
     229              :      * additional slot.  We need that many slots to hold all the tuples kept
     230              :      * in the heap during merge, plus the one we have last returned from the
     231              :      * sort, with tuplesort_gettuple.
     232              :      *
     233              :      * Initially, all the slots are kept in a linked list of free slots.  When
     234              :      * a tuple is read from a tape, it is put to the next available slot, if
     235              :      * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     236              :      * instead.
     237              :      *
     238              :      * When we're done processing a tuple, we return the slot back to the free
     239              :      * list, or pfree() if it was palloc'd.  We know that a tuple was
     240              :      * allocated from the slab, if its pointer value is between
     241              :      * slabMemoryBegin and -End.
     242              :      *
     243              :      * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     244              :      * tracking memory usage is not used.
     245              :      */
     246              :     bool        slabAllocatorUsed;
     247              : 
     248              :     char       *slabMemoryBegin;    /* beginning of slab memory arena */
     249              :     char       *slabMemoryEnd;  /* end of slab memory arena */
     250              :     SlabSlot   *slabFreeHead;   /* head of free list */
     251              : 
     252              :     /* Memory used for input and output tape buffers. */
     253              :     size_t      tape_buffer_mem;
     254              : 
     255              :     /*
     256              :      * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     257              :      * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     258              :      * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     259              :      * recycle the memory on next gettuple call.
     260              :      */
     261              :     void       *lastReturnedTuple;
     262              : 
     263              :     /*
     264              :      * While building initial runs, this is the current output run number.
     265              :      * Afterwards, it is the number of initial runs we made.
     266              :      */
     267              :     int         currentRun;
     268              : 
     269              :     /*
     270              :      * Logical tapes, for merging.
     271              :      *
     272              :      * The initial runs are written in the output tapes.  In each merge pass,
     273              :      * the output tapes of the previous pass become the input tapes, and new
     274              :      * output tapes are created as needed.  When nInputTapes equals
     275              :      * nInputRuns, there is only one merge pass left.
     276              :      */
     277              :     LogicalTape **inputTapes;
     278              :     int         nInputTapes;
     279              :     int         nInputRuns;
     280              : 
     281              :     LogicalTape **outputTapes;
     282              :     int         nOutputTapes;
     283              :     int         nOutputRuns;
     284              : 
     285              :     LogicalTape *destTape;      /* current output tape */
     286              : 
     287              :     /*
     288              :      * These variables are used after completion of sorting to keep track of
     289              :      * the next tuple to return.  (In the tape case, the tape's current read
     290              :      * position is also critical state.)
     291              :      */
     292              :     LogicalTape *result_tape;   /* actual tape of finished output */
     293              :     int         current;        /* array index (only used if SORTEDINMEM) */
     294              :     bool        eof_reached;    /* reached EOF (needed for cursors) */
     295              : 
     296              :     /* markpos_xxx holds marked position for mark and restore */
     297              :     int64       markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
     298              :     int         markpos_offset; /* saved "current", or offset in tape block */
     299              :     bool        markpos_eof;    /* saved "eof_reached" */
     300              : 
     301              :     /*
     302              :      * These variables are used during parallel sorting.
     303              :      *
     304              :      * worker is our worker identifier.  Follows the general convention that
     305              :      * -1 value relates to a leader tuplesort, and values >= 0 worker
     306              :      * tuplesorts. (-1 can also be a serial tuplesort.)
     307              :      *
     308              :      * shared is mutable shared memory state, which is used to coordinate
     309              :      * parallel sorts.
     310              :      *
     311              :      * nParticipants is the number of worker Tuplesortstates known by the
     312              :      * leader to have actually been launched, which implies that they must
     313              :      * finish a run that the leader needs to merge.  Typically includes a
     314              :      * worker state held by the leader process itself.  Set in the leader
     315              :      * Tuplesortstate only.
     316              :      */
     317              :     int         worker;
     318              :     Sharedsort *shared;
     319              :     int         nParticipants;
     320              : 
     321              :     /*
     322              :      * Additional state for managing "abbreviated key" sortsupport routines
     323              :      * (which currently may be used by all cases except the hash index case).
     324              :      * Tracks the intervals at which the optimization's effectiveness is
     325              :      * tested.
     326              :      */
     327              :     int64       abbrevNext;     /* Tuple # at which to next check
     328              :                                  * applicability */
     329              : 
     330              :     /*
     331              :      * Resource snapshot for time of sort start.
     332              :      */
     333              :     PGRUsage    ru_start;
     334              : };
     335              : 
     336              : /*
     337              :  * Private mutable state of tuplesort-parallel-operation.  This is allocated
     338              :  * in shared memory.
     339              :  */
     340              : struct Sharedsort
     341              : {
     342              :     /* mutex protects all fields prior to tapes */
     343              :     slock_t     mutex;
     344              : 
     345              :     /*
     346              :      * currentWorker generates ordinal identifier numbers for parallel sort
     347              :      * workers.  These start from 0, and are always gapless.
     348              :      *
     349              :      * Workers increment workersFinished to indicate having finished.  If this
     350              :      * is equal to state.nParticipants within the leader, leader is ready to
     351              :      * merge worker runs.
     352              :      */
     353              :     int         currentWorker;
     354              :     int         workersFinished;
     355              : 
     356              :     /* Temporary file space */
     357              :     SharedFileSet fileset;
     358              : 
     359              :     /* Size of tapes flexible array */
     360              :     int         nTapes;
     361              : 
     362              :     /*
     363              :      * Tapes array used by workers to report back information needed by the
     364              :      * leader to concatenate all worker tapes into one for merging
     365              :      */
     366              :     TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
     367              : };
     368              : 
     369              : /*
     370              :  * Is the given tuple allocated from the slab memory arena?
     371              :  */
     372              : #define IS_SLAB_SLOT(state, tuple) \
     373              :     ((char *) (tuple) >= (state)->slabMemoryBegin && \
     374              :      (char *) (tuple) < (state)->slabMemoryEnd)
     375              : 
     376              : /*
     377              :  * Return the given tuple to the slab memory free list, or free it
     378              :  * if it was palloc'd.
     379              :  */
     380              : #define RELEASE_SLAB_SLOT(state, tuple) \
     381              :     do { \
     382              :         SlabSlot *buf = (SlabSlot *) tuple; \
     383              :         \
     384              :         if (IS_SLAB_SLOT((state), buf)) \
     385              :         { \
     386              :             buf->nextfree = (state)->slabFreeHead; \
     387              :             (state)->slabFreeHead = buf; \
     388              :         } else \
     389              :             pfree(buf); \
     390              :     } while(0)
     391              : 
     392              : #define REMOVEABBREV(state,stup,count)  ((*(state)->base.removeabbrev) (state, stup, count))
     393              : #define COMPARETUP(state,a,b)   ((*(state)->base.comparetup) (a, b, state))
     394              : #define WRITETUP(state,tape,stup)   ((*(state)->base.writetup) (state, tape, stup))
     395              : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
     396              : #define FREESTATE(state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
     397              : #define LACKMEM(state)      ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
     398              : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     399              : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     400              : #define SERIAL(state)       ((state)->shared == NULL)
     401              : #define WORKER(state)       ((state)->shared && (state)->worker != -1)
     402              : #define LEADER(state)       ((state)->shared && (state)->worker == -1)
     403              : 
     404              : /*
     405              :  * NOTES about on-tape representation of tuples:
     406              :  *
     407              :  * We require the first "unsigned int" of a stored tuple to be the total size
     408              :  * on-tape of the tuple, including itself (so it is never zero; an all-zero
     409              :  * unsigned int is used to delimit runs).  The remainder of the stored tuple
     410              :  * may or may not match the in-memory representation of the tuple ---
     411              :  * any conversion needed is the job of the writetup and readtup routines.
     412              :  *
     413              :  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
     414              :  * representation of the tuple must be followed by another "unsigned int" that
     415              :  * is a copy of the length --- so the total tape space used is actually
     416              :  * sizeof(unsigned int) more than the stored length value.  This allows
     417              :  * read-backwards.  When the random access flag was not specified, the
     418              :  * write/read routines may omit the extra length word.
     419              :  *
     420              :  * writetup is expected to write both length words as well as the tuple
     421              :  * data.  When readtup is called, the tape is positioned just after the
     422              :  * front length word; readtup must read the tuple data and advance past
     423              :  * the back length word (if present).
     424              :  *
     425              :  * The write/read routines can make use of the tuple description data
     426              :  * stored in the Tuplesortstate record, if needed.  They are also expected
     427              :  * to adjust state->availMem by the amount of memory space (not tape space!)
     428              :  * released or consumed.  There is no error return from either writetup
     429              :  * or readtup; they should ereport() on failure.
     430              :  *
     431              :  *
     432              :  * NOTES about memory consumption calculations:
     433              :  *
     434              :  * We count space allocated for tuples against the workMem limit, plus
     435              :  * the space used by the variable-size memtuples array.  Fixed-size space
     436              :  * is not counted; it's small enough to not be interesting.
     437              :  *
     438              :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     439              :  * rather than the originally-requested size.  This is important since
     440              :  * palloc can add substantial overhead.  It's not a complete answer since
     441              :  * we won't count any wasted space in palloc allocation blocks, but it's
     442              :  * a lot better than what we were doing before 7.3.  As of 9.6, a
     443              :  * separate memory context is used for caller passed tuples.  Resetting
     444              :  * it at certain key increments significantly ameliorates fragmentation.
     445              :  * readtup routines use the slab allocator (they cannot use
     446              :  * the reset context because it gets deleted at the point that merging
     447              :  * begins).
     448              :  */
     449              : 
     450              : 
     451              : static void tuplesort_begin_batch(Tuplesortstate *state);
     452              : static bool consider_abort_common(Tuplesortstate *state);
     453              : static void inittapes(Tuplesortstate *state, bool mergeruns);
     454              : static void inittapestate(Tuplesortstate *state, int maxTapes);
     455              : static void selectnewtape(Tuplesortstate *state);
     456              : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
     457              : static void mergeruns(Tuplesortstate *state);
     458              : static void mergeonerun(Tuplesortstate *state);
     459              : static void beginmerge(Tuplesortstate *state);
     460              : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
     461              : static void dumptuples(Tuplesortstate *state, bool alltuples);
     462              : static void make_bounded_heap(Tuplesortstate *state);
     463              : static void sort_bounded_heap(Tuplesortstate *state);
     464              : static void tuplesort_sort_memtuples(Tuplesortstate *state);
     465              : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
     466              : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
     467              : static void tuplesort_heap_delete_top(Tuplesortstate *state);
     468              : static void reversedirection(Tuplesortstate *state);
     469              : static unsigned int getlen(LogicalTape *tape, bool eofOK);
     470              : static void markrunend(LogicalTape *tape);
     471              : static int  worker_get_identifier(Tuplesortstate *state);
     472              : static void worker_freeze_result_tape(Tuplesortstate *state);
     473              : static void worker_nomergeruns(Tuplesortstate *state);
     474              : static void leader_takeover_tapes(Tuplesortstate *state);
     475              : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
     476              : static void tuplesort_free(Tuplesortstate *state);
     477              : static void tuplesort_updatemax(Tuplesortstate *state);
     478              : 
     479              : 
     480              : /*
     481              :  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
     482              :  * any variant of SortTuples, using the appropriate comparetup function.
     483              :  * qsort_ssup() is specialized for the case where the comparetup function
     484              :  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
     485              :  * and Datum sorts.
     486              :  */
     487              : 
     488              : #define ST_SORT qsort_tuple
     489              : #define ST_ELEMENT_TYPE SortTuple
     490              : #define ST_COMPARE_RUNTIME_POINTER
     491              : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     492              : #define ST_CHECK_FOR_INTERRUPTS
     493              : #define ST_SCOPE static
     494              : #define ST_DECLARE
     495              : #define ST_DEFINE
     496              : #include "lib/sort_template.h"
     497              : 
     498              : #define ST_SORT qsort_ssup
     499              : #define ST_ELEMENT_TYPE SortTuple
     500              : #define ST_COMPARE(a, b, ssup) \
     501              :     ApplySortComparator((a)->datum1, (a)->isnull1, \
     502              :                         (b)->datum1, (b)->isnull1, (ssup))
     503              : #define ST_COMPARE_ARG_TYPE SortSupportData
     504              : #define ST_CHECK_FOR_INTERRUPTS
     505              : #define ST_SCOPE static
     506              : #define ST_DEFINE
     507              : #include "lib/sort_template.h"
     508              : 
     509              : /* state for radix sort */
     510              : typedef struct RadixSortInfo
     511              : {
     512              :     union
     513              :     {
     514              :         size_t      count;
     515              :         size_t      offset;
     516              :     };
     517              :     size_t      next_offset;
     518              : } RadixSortInfo;
     519              : 
     520              : /*
     521              :  * Threshold below which qsort_tuple() is generally faster than a radix sort.
     522              :  */
     523              : #define QSORT_THRESHOLD 40
     524              : 
     525              : 
     526              : /*
     527              :  *      tuplesort_begin_xxx
     528              :  *
     529              :  * Initialize for a tuple sort operation.
     530              :  *
     531              :  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
     532              :  * zero or more times, then call tuplesort_performsort when all the tuples
     533              :  * have been supplied.  After performsort, retrieve the tuples in sorted
     534              :  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
     535              :  * access was requested, rescan, markpos, and restorepos can also be called.)
     536              :  * Call tuplesort_end to terminate the operation and release memory/disk space.
     537              :  *
     538              :  * Each variant of tuplesort_begin has a workMem parameter specifying the
     539              :  * maximum number of kilobytes of RAM to use before spilling data to disk.
     540              :  * (The normal value of this parameter is work_mem, but some callers use
     541              :  * other values.)  Each variant also has a sortopt which is a bitmask of
     542              :  * sort options.  See TUPLESORT_* definitions in tuplesort.h
     543              :  */
     544              : 
     545              : Tuplesortstate *
     546       141303 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
     547              : {
     548              :     Tuplesortstate *state;
     549              :     MemoryContext maincontext;
     550              :     MemoryContext sortcontext;
     551              :     MemoryContext oldcontext;
     552              : 
     553              :     /* See leader_takeover_tapes() remarks on random access support */
     554       141303 :     if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
     555            0 :         elog(ERROR, "random access disallowed under parallel sort");
     556              : 
     557              :     /*
     558              :      * Memory context surviving tuplesort_reset.  This memory context holds
     559              :      * data which is useful to keep while sorting multiple similar batches.
     560              :      */
     561       141303 :     maincontext = AllocSetContextCreate(CurrentMemoryContext,
     562              :                                         "TupleSort main",
     563              :                                         ALLOCSET_DEFAULT_SIZES);
     564              : 
     565              :     /*
     566              :      * Create a working memory context for one sort operation.  The content of
     567              :      * this context is deleted by tuplesort_reset.
     568              :      */
     569       141303 :     sortcontext = AllocSetContextCreate(maincontext,
     570              :                                         "TupleSort sort",
     571              :                                         ALLOCSET_DEFAULT_SIZES);
     572              : 
     573              :     /*
     574              :      * Additionally a working memory context for tuples is setup in
     575              :      * tuplesort_begin_batch.
     576              :      */
     577              : 
     578              :     /*
     579              :      * Make the Tuplesortstate within the per-sortstate context.  This way, we
     580              :      * don't need a separate pfree() operation for it at shutdown.
     581              :      */
     582       141303 :     oldcontext = MemoryContextSwitchTo(maincontext);
     583              : 
     584       141303 :     state = palloc0_object(Tuplesortstate);
     585              : 
     586       141303 :     if (trace_sort)
     587            0 :         pg_rusage_init(&state->ru_start);
     588              : 
     589       141303 :     state->base.sortopt = sortopt;
     590       141303 :     state->base.tuples = true;
     591       141303 :     state->abbrevNext = 10;
     592              : 
     593              :     /*
     594              :      * workMem is forced to be at least 64KB, the current minimum valid value
     595              :      * for the work_mem GUC.  This is a defense against parallel sort callers
     596              :      * that divide out memory among many workers in a way that leaves each
     597              :      * with very little memory.
     598              :      */
     599       141303 :     state->allowedMem = Max(workMem, 64) * (int64) 1024;
     600       141303 :     state->base.sortcontext = sortcontext;
     601       141303 :     state->base.maincontext = maincontext;
     602              : 
     603       141303 :     state->memtupsize = INITIAL_MEMTUPSIZE;
     604       141303 :     state->memtuples = NULL;
     605              : 
     606              :     /*
     607              :      * After all of the other non-parallel-related state, we setup all of the
     608              :      * state needed for each batch.
     609              :      */
     610       141303 :     tuplesort_begin_batch(state);
     611              : 
     612              :     /*
     613              :      * Initialize parallel-related state based on coordination information
     614              :      * from caller
     615              :      */
     616       141303 :     if (!coordinate)
     617              :     {
     618              :         /* Serial sort */
     619       140889 :         state->shared = NULL;
     620       140889 :         state->worker = -1;
     621       140889 :         state->nParticipants = -1;
     622              :     }
     623          414 :     else if (coordinate->isWorker)
     624              :     {
     625              :         /* Parallel worker produces exactly one final run from all input */
     626          282 :         state->shared = coordinate->sharedsort;
     627          282 :         state->worker = worker_get_identifier(state);
     628          282 :         state->nParticipants = -1;
     629              :     }
     630              :     else
     631              :     {
     632              :         /* Parallel leader state only used for final merge */
     633          132 :         state->shared = coordinate->sharedsort;
     634          132 :         state->worker = -1;
     635          132 :         state->nParticipants = coordinate->nParticipants;
     636              :         Assert(state->nParticipants >= 1);
     637              :     }
     638              : 
     639       141303 :     MemoryContextSwitchTo(oldcontext);
     640              : 
     641       141303 :     return state;
     642              : }
     643              : 
     644              : /*
     645              :  *      tuplesort_begin_batch
     646              :  *
     647              :  * Setup, or reset, all state need for processing a new set of tuples with this
     648              :  * sort state. Called both from tuplesort_begin_common (the first time sorting
     649              :  * with this sort state) and tuplesort_reset (for subsequent usages).
     650              :  */
     651              : static void
     652       143010 : tuplesort_begin_batch(Tuplesortstate *state)
     653              : {
     654              :     MemoryContext oldcontext;
     655              : 
     656       143010 :     oldcontext = MemoryContextSwitchTo(state->base.maincontext);
     657              : 
     658              :     /*
     659              :      * Caller tuple (e.g. IndexTuple) memory context.
     660              :      *
     661              :      * A dedicated child context used exclusively for caller passed tuples
     662              :      * eases memory management.  Resetting at key points reduces
     663              :      * fragmentation. Note that the memtuples array of SortTuples is allocated
     664              :      * in the parent context, not this context, because there is no need to
     665              :      * free memtuples early.  For bounded sorts, tuples may be pfreed in any
     666              :      * order, so we use a regular aset.c context so that it can make use of
     667              :      * free'd memory.  When the sort is not bounded, we make use of a bump.c
     668              :      * context as this keeps allocations more compact with less wastage.
     669              :      * Allocations are also slightly more CPU efficient.
     670              :      */
     671       143010 :     if (TupleSortUseBumpTupleCxt(state->base.sortopt))
     672       142324 :         state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
     673              :                                                      "Caller tuples",
     674              :                                                      ALLOCSET_DEFAULT_SIZES);
     675              :     else
     676          686 :         state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
     677              :                                                          "Caller tuples",
     678              :                                                          ALLOCSET_DEFAULT_SIZES);
     679              : 
     680              : 
     681       143010 :     state->status = TSS_INITIAL;
     682       143010 :     state->bounded = false;
     683       143010 :     state->boundUsed = false;
     684              : 
     685       143010 :     state->availMem = state->allowedMem;
     686              : 
     687       143010 :     state->tapeset = NULL;
     688              : 
     689       143010 :     state->memtupcount = 0;
     690              : 
     691       143010 :     state->growmemtuples = true;
     692       143010 :     state->slabAllocatorUsed = false;
     693       143010 :     if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
     694              :     {
     695           36 :         pfree(state->memtuples);
     696           36 :         state->memtuples = NULL;
     697           36 :         state->memtupsize = INITIAL_MEMTUPSIZE;
     698              :     }
     699       143010 :     if (state->memtuples == NULL)
     700              :     {
     701       141339 :         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
     702       141339 :         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     703              :     }
     704              : 
     705              :     /* workMem must be large enough for the minimal memtuples array */
     706       143010 :     if (LACKMEM(state))
     707            0 :         elog(ERROR, "insufficient memory allowed for sort");
     708              : 
     709       143010 :     state->currentRun = 0;
     710              : 
     711              :     /*
     712              :      * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
     713              :      * inittapes(), if needed.
     714              :      */
     715              : 
     716       143010 :     state->result_tape = NULL;   /* flag that result tape has not been formed */
     717              : 
     718       143010 :     MemoryContextSwitchTo(oldcontext);
     719       143010 : }
     720              : 
     721              : /*
     722              :  * tuplesort_set_bound
     723              :  *
     724              :  *  Advise tuplesort that at most the first N result tuples are required.
     725              :  *
     726              :  * Must be called before inserting any tuples.  (Actually, we could allow it
     727              :  * as long as the sort hasn't spilled to disk, but there seems no need for
     728              :  * delayed calls at the moment.)
     729              :  *
     730              :  * This is a hint only. The tuplesort may still return more tuples than
     731              :  * requested.  Parallel leader tuplesorts will always ignore the hint.
     732              :  */
     733              : void
     734          619 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
     735              : {
     736              :     /* Assert we're called before loading any tuples */
     737              :     Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
     738              :     /* Assert we allow bounded sorts */
     739              :     Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
     740              :     /* Can't set the bound twice, either */
     741              :     Assert(!state->bounded);
     742              :     /* Also, this shouldn't be called in a parallel worker */
     743              :     Assert(!WORKER(state));
     744              : 
     745              :     /* Parallel leader allows but ignores hint */
     746          619 :     if (LEADER(state))
     747            0 :         return;
     748              : 
     749              : #ifdef DEBUG_BOUNDED_SORT
     750              :     /* Honor GUC setting that disables the feature (for easy testing) */
     751              :     if (!optimize_bounded_sort)
     752              :         return;
     753              : #endif
     754              : 
     755              :     /* We want to be able to compute bound * 2, so limit the setting */
     756          619 :     if (bound > (int64) (INT_MAX / 2))
     757            0 :         return;
     758              : 
     759          619 :     state->bounded = true;
     760          619 :     state->bound = (int) bound;
     761              : 
     762              :     /*
     763              :      * Bounded sorts are not an effective target for abbreviated key
     764              :      * optimization.  Disable by setting state to be consistent with no
     765              :      * abbreviation support.
     766              :      */
     767          619 :     state->base.sortKeys->abbrev_converter = NULL;
     768          619 :     if (state->base.sortKeys->abbrev_full_comparator)
     769            8 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
     770              : 
     771              :     /* Not strictly necessary, but be tidy */
     772          619 :     state->base.sortKeys->abbrev_abort = NULL;
     773          619 :     state->base.sortKeys->abbrev_full_comparator = NULL;
     774              : }
     775              : 
     776              : /*
     777              :  * tuplesort_used_bound
     778              :  *
     779              :  * Allow callers to find out if the sort state was able to use a bound.
     780              :  */
     781              : bool
     782          190 : tuplesort_used_bound(Tuplesortstate *state)
     783              : {
     784          190 :     return state->boundUsed;
     785              : }
     786              : 
     787              : /*
     788              :  * tuplesort_free
     789              :  *
     790              :  *  Internal routine for freeing resources of tuplesort.
     791              :  */
     792              : static void
     793       142879 : tuplesort_free(Tuplesortstate *state)
     794              : {
     795              :     /* context swap probably not needed, but let's be safe */
     796       142879 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
     797              :     int64       spaceUsed;
     798              : 
     799       142879 :     if (state->tapeset)
     800          449 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset);
     801              :     else
     802       142430 :         spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
     803              : 
     804              :     /*
     805              :      * Delete temporary "tape" files, if any.
     806              :      *
     807              :      * We don't bother to destroy the individual tapes here. They will go away
     808              :      * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
     809              :      * finished tapes already.)
     810              :      */
     811       142879 :     if (state->tapeset)
     812          449 :         LogicalTapeSetClose(state->tapeset);
     813              : 
     814       142879 :     if (trace_sort)
     815              :     {
     816            0 :         if (state->tapeset)
     817            0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
     818              :                  SERIAL(state) ? "external sort" : "parallel external sort",
     819              :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     820              :         else
     821            0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
     822              :                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
     823              :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     824              :     }
     825              : 
     826              :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
     827              : 
     828       142879 :     FREESTATE(state);
     829       142879 :     MemoryContextSwitchTo(oldcontext);
     830              : 
     831              :     /*
     832              :      * Free the per-sort memory context, thereby releasing all working memory.
     833              :      */
     834       142879 :     MemoryContextReset(state->base.sortcontext);
     835       142879 : }
     836              : 
     837              : /*
     838              :  * tuplesort_end
     839              :  *
     840              :  *  Release resources and clean up.
     841              :  *
     842              :  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
     843              :  * pointing to garbage.  Be careful not to attempt to use or free such
     844              :  * pointers afterwards!
     845              :  */
     846              : void
     847       141172 : tuplesort_end(Tuplesortstate *state)
     848              : {
     849       141172 :     tuplesort_free(state);
     850              : 
     851              :     /*
     852              :      * Free the main memory context, including the Tuplesortstate struct
     853              :      * itself.
     854              :      */
     855       141172 :     MemoryContextDelete(state->base.maincontext);
     856       141172 : }
     857              : 
     858              : /*
     859              :  * tuplesort_updatemax
     860              :  *
     861              :  *  Update maximum resource usage statistics.
     862              :  */
     863              : static void
     864         1905 : tuplesort_updatemax(Tuplesortstate *state)
     865              : {
     866              :     int64       spaceUsed;
     867              :     bool        isSpaceDisk;
     868              : 
     869              :     /*
     870              :      * Note: it might seem we should provide both memory and disk usage for a
     871              :      * disk-based sort.  However, the current code doesn't track memory space
     872              :      * accurately once we have begun to return tuples to the caller (since we
     873              :      * don't account for pfree's the caller is expected to do), so we cannot
     874              :      * rely on availMem in a disk sort.  This does not seem worth the overhead
     875              :      * to fix.  Is it worth creating an API for the memory context code to
     876              :      * tell us how much is actually used in sortcontext?
     877              :      */
     878         1905 :     if (state->tapeset)
     879              :     {
     880            3 :         isSpaceDisk = true;
     881            3 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
     882              :     }
     883              :     else
     884              :     {
     885         1902 :         isSpaceDisk = false;
     886         1902 :         spaceUsed = state->allowedMem - state->availMem;
     887              :     }
     888              : 
     889              :     /*
     890              :      * Sort evicts data to the disk when it wasn't able to fit that data into
     891              :      * main memory.  This is why we assume space used on the disk to be more
     892              :      * important for tracking resource usage than space used in memory. Note
     893              :      * that the amount of space occupied by some tupleset on the disk might be
     894              :      * less than amount of space occupied by the same tupleset in memory due
     895              :      * to more compact representation.
     896              :      */
     897         1905 :     if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
     898         1902 :         (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
     899              :     {
     900          253 :         state->maxSpace = spaceUsed;
     901          253 :         state->isMaxSpaceDisk = isSpaceDisk;
     902          253 :         state->maxSpaceStatus = state->status;
     903              :     }
     904         1905 : }
     905              : 
     906              : /*
     907              :  * tuplesort_reset
     908              :  *
     909              :  *  Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
     910              :  *  meta-information in.  After tuplesort_reset, tuplesort is ready to start
     911              :  *  a new sort.  This allows avoiding recreation of tuple sort states (and
     912              :  *  save resources) when sorting multiple small batches.
     913              :  */
     914              : void
     915         1707 : tuplesort_reset(Tuplesortstate *state)
     916              : {
     917         1707 :     tuplesort_updatemax(state);
     918         1707 :     tuplesort_free(state);
     919              : 
     920              :     /*
     921              :      * After we've freed up per-batch memory, re-setup all of the state common
     922              :      * to both the first batch and any subsequent batch.
     923              :      */
     924         1707 :     tuplesort_begin_batch(state);
     925              : 
     926         1707 :     state->lastReturnedTuple = NULL;
     927         1707 :     state->slabMemoryBegin = NULL;
     928         1707 :     state->slabMemoryEnd = NULL;
     929         1707 :     state->slabFreeHead = NULL;
     930         1707 : }
     931              : 
     932              : /*
     933              :  * Grow the memtuples[] array, if possible within our memory constraint.  We
     934              :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
     935              :  * limit.  Return true if we were able to enlarge the array, false if not.
     936              :  *
     937              :  * Normally, at each increment we double the size of the array.  When doing
     938              :  * that would exceed a limit, we attempt one last, smaller increase (and then
     939              :  * clear the growmemtuples flag so we don't try any more).  That allows us to
     940              :  * use memory as fully as permitted; sticking to the pure doubling rule could
     941              :  * result in almost half going unused.  Because availMem moves around with
     942              :  * tuple addition/removal, we need some rule to prevent making repeated small
     943              :  * increases in memtupsize, which would just be useless thrashing.  The
     944              :  * growmemtuples flag accomplishes that and also prevents useless
     945              :  * recalculations in this function.
     946              :  */
     947              : static bool
     948         4520 : grow_memtuples(Tuplesortstate *state)
     949              : {
     950              :     int         newmemtupsize;
     951         4520 :     int         memtupsize = state->memtupsize;
     952         4520 :     int64       memNowUsed = state->allowedMem - state->availMem;
     953              : 
     954              :     /* Forget it if we've already maxed out memtuples, per comment above */
     955         4520 :     if (!state->growmemtuples)
     956           70 :         return false;
     957              : 
     958              :     /* Select new value of memtupsize */
     959         4450 :     if (memNowUsed <= state->availMem)
     960              :     {
     961              :         /*
     962              :          * We've used no more than half of allowedMem; double our usage,
     963              :          * clamping at INT_MAX tuples.
     964              :          */
     965         4377 :         if (memtupsize < INT_MAX / 2)
     966         4377 :             newmemtupsize = memtupsize * 2;
     967              :         else
     968              :         {
     969            0 :             newmemtupsize = INT_MAX;
     970            0 :             state->growmemtuples = false;
     971              :         }
     972              :     }
     973              :     else
     974              :     {
     975              :         /*
     976              :          * This will be the last increment of memtupsize.  Abandon doubling
     977              :          * strategy and instead increase as much as we safely can.
     978              :          *
     979              :          * To stay within allowedMem, we can't increase memtupsize by more
     980              :          * than availMem / sizeof(SortTuple) elements.  In practice, we want
     981              :          * to increase it by considerably less, because we need to leave some
     982              :          * space for the tuples to which the new array slots will refer.  We
     983              :          * assume the new tuples will be about the same size as the tuples
     984              :          * we've already seen, and thus we can extrapolate from the space
     985              :          * consumption so far to estimate an appropriate new size for the
     986              :          * memtuples array.  The optimal value might be higher or lower than
     987              :          * this estimate, but it's hard to know that in advance.  We again
     988              :          * clamp at INT_MAX tuples.
     989              :          *
     990              :          * This calculation is safe against enlarging the array so much that
     991              :          * LACKMEM becomes true, because the memory currently used includes
     992              :          * the present array; thus, there would be enough allowedMem for the
     993              :          * new array elements even if no other memory were currently used.
     994              :          *
     995              :          * We do the arithmetic in float8, because otherwise the product of
     996              :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
     997              :          * result should be insignificant; but even if we computed a
     998              :          * completely insane result, the checks below will prevent anything
     999              :          * really bad from happening.
    1000              :          */
    1001              :         double      grow_ratio;
    1002              : 
    1003           73 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
    1004           73 :         if (memtupsize * grow_ratio < INT_MAX)
    1005           73 :             newmemtupsize = (int) (memtupsize * grow_ratio);
    1006              :         else
    1007            0 :             newmemtupsize = INT_MAX;
    1008              : 
    1009              :         /* We won't make any further enlargement attempts */
    1010           73 :         state->growmemtuples = false;
    1011              :     }
    1012              : 
    1013              :     /* Must enlarge array by at least one element, else report failure */
    1014         4450 :     if (newmemtupsize <= memtupsize)
    1015            0 :         goto noalloc;
    1016              : 
    1017              :     /*
    1018              :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
    1019              :      * to ensure our request won't be rejected.  Note that we can easily
    1020              :      * exhaust address space before facing this outcome.  (This is presently
    1021              :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
    1022              :      * don't rely on that at this distance.)
    1023              :      */
    1024         4450 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    1025              :     {
    1026            0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
    1027            0 :         state->growmemtuples = false;    /* can't grow any more */
    1028              :     }
    1029              : 
    1030              :     /*
    1031              :      * We need to be sure that we do not cause LACKMEM to become true, else
    1032              :      * the space management algorithm will go nuts.  The code above should
    1033              :      * never generate a dangerous request, but to be safe, check explicitly
    1034              :      * that the array growth fits within availMem.  (We could still cause
    1035              :      * LACKMEM if the memory chunk overhead associated with the memtuples
    1036              :      * array were to increase.  That shouldn't happen because we chose the
    1037              :      * initial array size large enough to ensure that palloc will be treating
    1038              :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
    1039              :      * explicitly below just in case.)
    1040              :      */
    1041         4450 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
    1042            0 :         goto noalloc;
    1043              : 
    1044              :     /* OK, do it */
    1045         4450 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1046         4450 :     state->memtupsize = newmemtupsize;
    1047         4450 :     state->memtuples = (SortTuple *)
    1048         4450 :         repalloc_huge(state->memtuples,
    1049         4450 :                       state->memtupsize * sizeof(SortTuple));
    1050         4450 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1051         4450 :     if (LACKMEM(state))
    1052            0 :         elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    1053         4450 :     return true;
    1054              : 
    1055            0 : noalloc:
    1056              :     /* If for any reason we didn't realloc, shut off future attempts */
    1057            0 :     state->growmemtuples = false;
    1058            0 :     return false;
    1059              : }
    1060              : 
    1061              : /*
    1062              :  * Shared code for tuple and datum cases.
    1063              :  */
    1064              : void
    1065     16180503 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
    1066              :                           bool useAbbrev, Size tuplen)
    1067              : {
    1068     16180503 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1069              : 
    1070              :     Assert(!LEADER(state));
    1071              : 
    1072              :     /* account for the memory used for this tuple */
    1073     16180503 :     USEMEM(state, tuplen);
    1074     16180503 :     state->tupleMem += tuplen;
    1075              : 
    1076     16180503 :     if (!useAbbrev)
    1077              :     {
    1078              :         /*
    1079              :          * Leave ordinary Datum representation, or NULL value.  If there is a
    1080              :          * converter it won't expect NULL values, and cost model is not
    1081              :          * required to account for NULL, so in that case we avoid calling
    1082              :          * converter and just set datum1 to zeroed representation (to be
    1083              :          * consistent, and to support cheap inequality tests for NULL
    1084              :          * abbreviated keys).
    1085              :          */
    1086              :     }
    1087      2222975 :     else if (!consider_abort_common(state))
    1088              :     {
    1089              :         /* Store abbreviated key representation */
    1090      2222927 :         tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
    1091              :                                                                state->base.sortKeys);
    1092              :     }
    1093              :     else
    1094              :     {
    1095              :         /*
    1096              :          * Set state to be consistent with never trying abbreviation.
    1097              :          *
    1098              :          * Alter datum1 representation in already-copied tuples, so as to
    1099              :          * ensure a consistent representation (current tuple was just
    1100              :          * handled).  It does not matter if some dumped tuples are already
    1101              :          * sorted on tape, since serialized tuples lack abbreviated keys
    1102              :          * (TSS_BUILDRUNS state prevents control reaching here in any case).
    1103              :          */
    1104           48 :         REMOVEABBREV(state, state->memtuples, state->memtupcount);
    1105              :     }
    1106              : 
    1107     16180503 :     switch (state->status)
    1108              :     {
    1109     13749288 :         case TSS_INITIAL:
    1110              : 
    1111              :             /*
    1112              :              * Save the tuple into the unsorted array.  First, grow the array
    1113              :              * as needed.  Note that we try to grow the array when there is
    1114              :              * still one free slot remaining --- if we fail, there'll still be
    1115              :              * room to store the incoming tuple, and then we'll switch to
    1116              :              * tape-based operation.
    1117              :              */
    1118     13749288 :             if (state->memtupcount >= state->memtupsize - 1)
    1119              :             {
    1120         4520 :                 (void) grow_memtuples(state);
    1121              :                 Assert(state->memtupcount < state->memtupsize);
    1122              :             }
    1123     13749288 :             state->memtuples[state->memtupcount++] = *tuple;
    1124              : 
    1125              :             /*
    1126              :              * Check if it's time to switch over to a bounded heapsort. We do
    1127              :              * so if the input tuple count exceeds twice the desired tuple
    1128              :              * count (this is a heuristic for where heapsort becomes cheaper
    1129              :              * than a quicksort), or if we've just filled workMem and have
    1130              :              * enough tuples to meet the bound.
    1131              :              *
    1132              :              * Note that once we enter TSS_BOUNDED state we will always try to
    1133              :              * complete the sort that way.  In the worst case, if later input
    1134              :              * tuples are larger than earlier ones, this might cause us to
    1135              :              * exceed workMem significantly.
    1136              :              */
    1137     13749288 :             if (state->bounded &&
    1138        23683 :                 (state->memtupcount > state->bound * 2 ||
    1139        23475 :                  (state->memtupcount > state->bound && LACKMEM(state))))
    1140              :             {
    1141          208 :                 if (trace_sort)
    1142            0 :                     elog(LOG, "switching to bounded heapsort at %d tuples: %s",
    1143              :                          state->memtupcount,
    1144              :                          pg_rusage_show(&state->ru_start));
    1145          208 :                 make_bounded_heap(state);
    1146          208 :                 MemoryContextSwitchTo(oldcontext);
    1147          208 :                 return;
    1148              :             }
    1149              : 
    1150              :             /*
    1151              :              * Done if we still fit in available memory and have array slots.
    1152              :              */
    1153     13749080 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
    1154              :             {
    1155     13749010 :                 MemoryContextSwitchTo(oldcontext);
    1156     13749010 :                 return;
    1157              :             }
    1158              : 
    1159              :             /*
    1160              :              * Nope; time to switch to tape-based operation.
    1161              :              */
    1162           70 :             inittapes(state, true);
    1163              : 
    1164              :             /*
    1165              :              * Dump all tuples.
    1166              :              */
    1167           70 :             dumptuples(state, false);
    1168           70 :             break;
    1169              : 
    1170      1875072 :         case TSS_BOUNDED:
    1171              : 
    1172              :             /*
    1173              :              * We don't want to grow the array here, so check whether the new
    1174              :              * tuple can be discarded before putting it in.  This should be a
    1175              :              * good speed optimization, too, since when there are many more
    1176              :              * input tuples than the bound, most input tuples can be discarded
    1177              :              * with just this one comparison.  Note that because we currently
    1178              :              * have the sort direction reversed, we must check for <= not >=.
    1179              :              */
    1180      1875072 :             if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
    1181              :             {
    1182              :                 /* new tuple <= top of the heap, so we can discard it */
    1183      1623542 :                 free_sort_tuple(state, tuple);
    1184      1623542 :                 CHECK_FOR_INTERRUPTS();
    1185              :             }
    1186              :             else
    1187              :             {
    1188              :                 /* discard top of heap, replacing it with the new tuple */
    1189       251530 :                 free_sort_tuple(state, &state->memtuples[0]);
    1190       251530 :                 tuplesort_heap_replace_top(state, tuple);
    1191              :             }
    1192      1875072 :             break;
    1193              : 
    1194       556143 :         case TSS_BUILDRUNS:
    1195              : 
    1196              :             /*
    1197              :              * Save the tuple into the unsorted array (there must be space)
    1198              :              */
    1199       556143 :             state->memtuples[state->memtupcount++] = *tuple;
    1200              : 
    1201              :             /*
    1202              :              * If we are over the memory limit, dump all tuples.
    1203              :              */
    1204       556143 :             dumptuples(state, false);
    1205       556143 :             break;
    1206              : 
    1207            0 :         default:
    1208            0 :             elog(ERROR, "invalid tuplesort state");
    1209              :             break;
    1210              :     }
    1211      2431285 :     MemoryContextSwitchTo(oldcontext);
    1212              : }
    1213              : 
    1214              : static bool
    1215      2222975 : consider_abort_common(Tuplesortstate *state)
    1216              : {
    1217              :     Assert(state->base.sortKeys[0].abbrev_converter != NULL);
    1218              :     Assert(state->base.sortKeys[0].abbrev_abort != NULL);
    1219              :     Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
    1220              : 
    1221              :     /*
    1222              :      * Check effectiveness of abbreviation optimization.  Consider aborting
    1223              :      * when still within memory limit.
    1224              :      */
    1225      2222975 :     if (state->status == TSS_INITIAL &&
    1226      1986760 :         state->memtupcount >= state->abbrevNext)
    1227              :     {
    1228         2572 :         state->abbrevNext *= 2;
    1229              : 
    1230              :         /*
    1231              :          * Check opclass-supplied abbreviation abort routine.  It may indicate
    1232              :          * that abbreviation should not proceed.
    1233              :          */
    1234         2572 :         if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
    1235              :                                                 state->base.sortKeys))
    1236         2524 :             return false;
    1237              : 
    1238              :         /*
    1239              :          * Finally, restore authoritative comparator, and indicate that
    1240              :          * abbreviation is not in play by setting abbrev_converter to NULL
    1241              :          */
    1242           48 :         state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
    1243           48 :         state->base.sortKeys[0].abbrev_converter = NULL;
    1244              :         /* Not strictly necessary, but be tidy */
    1245           48 :         state->base.sortKeys[0].abbrev_abort = NULL;
    1246           48 :         state->base.sortKeys[0].abbrev_full_comparator = NULL;
    1247              : 
    1248              :         /* Give up - expect original pass-by-value representation */
    1249           48 :         return true;
    1250              :     }
    1251              : 
    1252      2220403 :     return false;
    1253              : }
    1254              : 
    1255              : /*
    1256              :  * All tuples have been provided; finish the sort.
    1257              :  */
    1258              : void
    1259       121508 : tuplesort_performsort(Tuplesortstate *state)
    1260              : {
    1261       121508 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1262              : 
    1263       121508 :     if (trace_sort)
    1264            0 :         elog(LOG, "performsort of worker %d starting: %s",
    1265              :              state->worker, pg_rusage_show(&state->ru_start));
    1266              : 
    1267       121508 :     switch (state->status)
    1268              :     {
    1269       121230 :         case TSS_INITIAL:
    1270              : 
    1271              :             /*
    1272              :              * We were able to accumulate all the tuples within the allowed
    1273              :              * amount of memory, or leader to take over worker tapes
    1274              :              */
    1275       121230 :             if (SERIAL(state))
    1276              :             {
    1277              :                 /* Sort in memory and we're done */
    1278       120851 :                 tuplesort_sort_memtuples(state);
    1279       120806 :                 state->status = TSS_SORTEDINMEM;
    1280              :             }
    1281          379 :             else if (WORKER(state))
    1282              :             {
    1283              :                 /*
    1284              :                  * Parallel workers must still dump out tuples to tape.  No
    1285              :                  * merge is required to produce single output run, though.
    1286              :                  */
    1287          282 :                 inittapes(state, false);
    1288          282 :                 dumptuples(state, true);
    1289          282 :                 worker_nomergeruns(state);
    1290          282 :                 state->status = TSS_SORTEDONTAPE;
    1291              :             }
    1292              :             else
    1293              :             {
    1294              :                 /*
    1295              :                  * Leader will take over worker tapes and merge worker runs.
    1296              :                  * Note that mergeruns sets the correct state->status.
    1297              :                  */
    1298           97 :                 leader_takeover_tapes(state);
    1299           97 :                 mergeruns(state);
    1300              :             }
    1301       121185 :             state->current = 0;
    1302       121185 :             state->eof_reached = false;
    1303       121185 :             state->markpos_block = 0L;
    1304       121185 :             state->markpos_offset = 0;
    1305       121185 :             state->markpos_eof = false;
    1306       121185 :             break;
    1307              : 
    1308          208 :         case TSS_BOUNDED:
    1309              : 
    1310              :             /*
    1311              :              * We were able to accumulate all the tuples required for output
    1312              :              * in memory, using a heap to eliminate excess tuples.  Now we
    1313              :              * have to transform the heap to a properly-sorted array. Note
    1314              :              * that sort_bounded_heap sets the correct state->status.
    1315              :              */
    1316          208 :             sort_bounded_heap(state);
    1317          208 :             state->current = 0;
    1318          208 :             state->eof_reached = false;
    1319          208 :             state->markpos_offset = 0;
    1320          208 :             state->markpos_eof = false;
    1321          208 :             break;
    1322              : 
    1323           70 :         case TSS_BUILDRUNS:
    1324              : 
    1325              :             /*
    1326              :              * Finish tape-based sort.  First, flush all tuples remaining in
    1327              :              * memory out to tape; then merge until we have a single remaining
    1328              :              * run (or, if !randomAccess and !WORKER(), one run per tape).
    1329              :              * Note that mergeruns sets the correct state->status.
    1330              :              */
    1331           70 :             dumptuples(state, true);
    1332           70 :             mergeruns(state);
    1333           70 :             state->eof_reached = false;
    1334           70 :             state->markpos_block = 0L;
    1335           70 :             state->markpos_offset = 0;
    1336           70 :             state->markpos_eof = false;
    1337           70 :             break;
    1338              : 
    1339            0 :         default:
    1340            0 :             elog(ERROR, "invalid tuplesort state");
    1341              :             break;
    1342              :     }
    1343              : 
    1344       121463 :     if (trace_sort)
    1345              :     {
    1346            0 :         if (state->status == TSS_FINALMERGE)
    1347            0 :             elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
    1348              :                  state->worker, state->nInputTapes,
    1349              :                  pg_rusage_show(&state->ru_start));
    1350              :         else
    1351            0 :             elog(LOG, "performsort of worker %d done: %s",
    1352              :                  state->worker, pg_rusage_show(&state->ru_start));
    1353              :     }
    1354              : 
    1355       121463 :     MemoryContextSwitchTo(oldcontext);
    1356       121463 : }
    1357              : 
    1358              : /*
    1359              :  * Internal routine to fetch the next tuple in either forward or back
    1360              :  * direction into *stup.  Returns false if no more tuples.
    1361              :  * Returned tuple belongs to tuplesort memory context, and must not be freed
    1362              :  * by caller.  Note that fetched tuple is stored in memory that may be
    1363              :  * recycled by any future fetch.
    1364              :  */
    1365              : bool
    1366     14932698 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    1367              :                           SortTuple *stup)
    1368              : {
    1369              :     unsigned int tuplen;
    1370              :     size_t      nmoved;
    1371              : 
    1372              :     Assert(!WORKER(state));
    1373              : 
    1374     14932698 :     switch (state->status)
    1375              :     {
    1376     12439312 :         case TSS_SORTEDINMEM:
    1377              :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1378              :             Assert(!state->slabAllocatorUsed);
    1379     12439312 :             if (forward)
    1380              :             {
    1381     12439279 :                 if (state->current < state->memtupcount)
    1382              :                 {
    1383     12319048 :                     *stup = state->memtuples[state->current++];
    1384     12319048 :                     return true;
    1385              :                 }
    1386       120231 :                 state->eof_reached = true;
    1387              : 
    1388              :                 /*
    1389              :                  * Complain if caller tries to retrieve more tuples than
    1390              :                  * originally asked for in a bounded sort.  This is because
    1391              :                  * returning EOF here might be the wrong thing.
    1392              :                  */
    1393       120231 :                 if (state->bounded && state->current >= state->bound)
    1394            0 :                     elog(ERROR, "retrieved too many tuples in a bounded sort");
    1395              : 
    1396       120231 :                 return false;
    1397              :             }
    1398              :             else
    1399              :             {
    1400           33 :                 if (state->current <= 0)
    1401            0 :                     return false;
    1402              : 
    1403              :                 /*
    1404              :                  * if all tuples are fetched already then we return last
    1405              :                  * tuple, else - tuple before last returned.
    1406              :                  */
    1407           33 :                 if (state->eof_reached)
    1408            6 :                     state->eof_reached = false;
    1409              :                 else
    1410              :                 {
    1411           27 :                     state->current--;    /* last returned tuple */
    1412           27 :                     if (state->current <= 0)
    1413            3 :                         return false;
    1414              :                 }
    1415           30 :                 *stup = state->memtuples[state->current - 1];
    1416           30 :                 return true;
    1417              :             }
    1418              :             break;
    1419              : 
    1420       151500 :         case TSS_SORTEDONTAPE:
    1421              :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1422              :             Assert(state->slabAllocatorUsed);
    1423              : 
    1424              :             /*
    1425              :              * The slot that held the tuple that we returned in previous
    1426              :              * gettuple call can now be reused.
    1427              :              */
    1428       151500 :             if (state->lastReturnedTuple)
    1429              :             {
    1430        76425 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1431        76425 :                 state->lastReturnedTuple = NULL;
    1432              :             }
    1433              : 
    1434       151500 :             if (forward)
    1435              :             {
    1436       151485 :                 if (state->eof_reached)
    1437            0 :                     return false;
    1438              : 
    1439       151485 :                 if ((tuplen = getlen(state->result_tape, true)) != 0)
    1440              :                 {
    1441       151470 :                     READTUP(state, stup, state->result_tape, tuplen);
    1442              : 
    1443              :                     /*
    1444              :                      * Remember the tuple we return, so that we can recycle
    1445              :                      * its memory on next call.  (This can be NULL, in the
    1446              :                      * !state->tuples case).
    1447              :                      */
    1448       151470 :                     state->lastReturnedTuple = stup->tuple;
    1449              : 
    1450       151470 :                     return true;
    1451              :                 }
    1452              :                 else
    1453              :                 {
    1454           15 :                     state->eof_reached = true;
    1455           15 :                     return false;
    1456              :                 }
    1457              :             }
    1458              : 
    1459              :             /*
    1460              :              * Backward.
    1461              :              *
    1462              :              * if all tuples are fetched already then we return last tuple,
    1463              :              * else - tuple before last returned.
    1464              :              */
    1465           15 :             if (state->eof_reached)
    1466              :             {
    1467              :                 /*
    1468              :                  * Seek position is pointing just past the zero tuplen at the
    1469              :                  * end of file; back up to fetch last tuple's ending length
    1470              :                  * word.  If seek fails we must have a completely empty file.
    1471              :                  */
    1472            6 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1473              :                                               2 * sizeof(unsigned int));
    1474            6 :                 if (nmoved == 0)
    1475            0 :                     return false;
    1476            6 :                 else if (nmoved != 2 * sizeof(unsigned int))
    1477            0 :                     elog(ERROR, "unexpected tape position");
    1478            6 :                 state->eof_reached = false;
    1479              :             }
    1480              :             else
    1481              :             {
    1482              :                 /*
    1483              :                  * Back up and fetch previously-returned tuple's ending length
    1484              :                  * word.  If seek fails, assume we are at start of file.
    1485              :                  */
    1486            9 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1487              :                                               sizeof(unsigned int));
    1488            9 :                 if (nmoved == 0)
    1489            0 :                     return false;
    1490            9 :                 else if (nmoved != sizeof(unsigned int))
    1491            0 :                     elog(ERROR, "unexpected tape position");
    1492            9 :                 tuplen = getlen(state->result_tape, false);
    1493              : 
    1494              :                 /*
    1495              :                  * Back up to get ending length word of tuple before it.
    1496              :                  */
    1497            9 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1498              :                                               tuplen + 2 * sizeof(unsigned int));
    1499            9 :                 if (nmoved == tuplen + sizeof(unsigned int))
    1500              :                 {
    1501              :                     /*
    1502              :                      * We backed up over the previous tuple, but there was no
    1503              :                      * ending length word before it.  That means that the prev
    1504              :                      * tuple is the first tuple in the file.  It is now the
    1505              :                      * next to read in forward direction (not obviously right,
    1506              :                      * but that is what in-memory case does).
    1507              :                      */
    1508            3 :                     return false;
    1509              :                 }
    1510            6 :                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
    1511            0 :                     elog(ERROR, "bogus tuple length in backward scan");
    1512              :             }
    1513              : 
    1514           12 :             tuplen = getlen(state->result_tape, false);
    1515              : 
    1516              :             /*
    1517              :              * Now we have the length of the prior tuple, back up and read it.
    1518              :              * Note: READTUP expects we are positioned after the initial
    1519              :              * length word of the tuple, so back up to that point.
    1520              :              */
    1521           12 :             nmoved = LogicalTapeBackspace(state->result_tape,
    1522              :                                           tuplen);
    1523           12 :             if (nmoved != tuplen)
    1524            0 :                 elog(ERROR, "bogus tuple length in backward scan");
    1525           12 :             READTUP(state, stup, state->result_tape, tuplen);
    1526              : 
    1527              :             /*
    1528              :              * Remember the tuple we return, so that we can recycle its memory
    1529              :              * on next call. (This can be NULL, in the Datum case).
    1530              :              */
    1531           12 :             state->lastReturnedTuple = stup->tuple;
    1532              : 
    1533           12 :             return true;
    1534              : 
    1535      2341886 :         case TSS_FINALMERGE:
    1536              :             Assert(forward);
    1537              :             /* We are managing memory ourselves, with the slab allocator. */
    1538              :             Assert(state->slabAllocatorUsed);
    1539              : 
    1540              :             /*
    1541              :              * The slab slot holding the tuple that we returned in previous
    1542              :              * gettuple call can now be reused.
    1543              :              */
    1544      2341886 :             if (state->lastReturnedTuple)
    1545              :             {
    1546      2281715 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1547      2281715 :                 state->lastReturnedTuple = NULL;
    1548              :             }
    1549              : 
    1550              :             /*
    1551              :              * This code should match the inner loop of mergeonerun().
    1552              :              */
    1553      2341886 :             if (state->memtupcount > 0)
    1554              :             {
    1555      2341739 :                 int         srcTapeIndex = state->memtuples[0].srctape;
    1556      2341739 :                 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
    1557              :                 SortTuple   newtup;
    1558              : 
    1559      2341739 :                 *stup = state->memtuples[0];
    1560              : 
    1561              :                 /*
    1562              :                  * Remember the tuple we return, so that we can recycle its
    1563              :                  * memory on next call. (This can be NULL, in the Datum case).
    1564              :                  */
    1565      2341739 :                 state->lastReturnedTuple = stup->tuple;
    1566              : 
    1567              :                 /*
    1568              :                  * Pull next tuple from tape, and replace the returned tuple
    1569              :                  * at top of the heap with it.
    1570              :                  */
    1571      2341739 :                 if (!mergereadnext(state, srcTape, &newtup))
    1572              :                 {
    1573              :                     /*
    1574              :                      * If no more data, we've reached end of run on this tape.
    1575              :                      * Remove the top node from the heap.
    1576              :                      */
    1577          216 :                     tuplesort_heap_delete_top(state);
    1578          216 :                     state->nInputRuns--;
    1579              : 
    1580              :                     /*
    1581              :                      * Close the tape.  It'd go away at the end of the sort
    1582              :                      * anyway, but better to release the memory early.
    1583              :                      */
    1584          216 :                     LogicalTapeClose(srcTape);
    1585          216 :                     return true;
    1586              :                 }
    1587      2341523 :                 newtup.srctape = srcTapeIndex;
    1588      2341523 :                 tuplesort_heap_replace_top(state, &newtup);
    1589      2341523 :                 return true;
    1590              :             }
    1591          147 :             return false;
    1592              : 
    1593            0 :         default:
    1594            0 :             elog(ERROR, "invalid tuplesort state");
    1595              :             return false;       /* keep compiler quiet */
    1596              :     }
    1597              : }
    1598              : 
    1599              : 
    1600              : /*
    1601              :  * Advance over N tuples in either forward or back direction,
    1602              :  * without returning any data.  N==0 is a no-op.
    1603              :  * Returns true if successful, false if ran out of tuples.
    1604              :  */
    1605              : bool
    1606          196 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
    1607              : {
    1608              :     MemoryContext oldcontext;
    1609              : 
    1610              :     /*
    1611              :      * We don't actually support backwards skip yet, because no callers need
    1612              :      * it.  The API is designed to allow for that later, though.
    1613              :      */
    1614              :     Assert(forward);
    1615              :     Assert(ntuples >= 0);
    1616              :     Assert(!WORKER(state));
    1617              : 
    1618          196 :     switch (state->status)
    1619              :     {
    1620          184 :         case TSS_SORTEDINMEM:
    1621          184 :             if (state->memtupcount - state->current >= ntuples)
    1622              :             {
    1623          184 :                 state->current += ntuples;
    1624          184 :                 return true;
    1625              :             }
    1626            0 :             state->current = state->memtupcount;
    1627            0 :             state->eof_reached = true;
    1628              : 
    1629              :             /*
    1630              :              * Complain if caller tries to retrieve more tuples than
    1631              :              * originally asked for in a bounded sort.  This is because
    1632              :              * returning EOF here might be the wrong thing.
    1633              :              */
    1634            0 :             if (state->bounded && state->current >= state->bound)
    1635            0 :                 elog(ERROR, "retrieved too many tuples in a bounded sort");
    1636              : 
    1637            0 :             return false;
    1638              : 
    1639           12 :         case TSS_SORTEDONTAPE:
    1640              :         case TSS_FINALMERGE:
    1641              : 
    1642              :             /*
    1643              :              * We could probably optimize these cases better, but for now it's
    1644              :              * not worth the trouble.
    1645              :              */
    1646           12 :             oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1647       120066 :             while (ntuples-- > 0)
    1648              :             {
    1649              :                 SortTuple   stup;
    1650              : 
    1651       120054 :                 if (!tuplesort_gettuple_common(state, forward, &stup))
    1652              :                 {
    1653            0 :                     MemoryContextSwitchTo(oldcontext);
    1654            0 :                     return false;
    1655              :                 }
    1656       120054 :                 CHECK_FOR_INTERRUPTS();
    1657              :             }
    1658           12 :             MemoryContextSwitchTo(oldcontext);
    1659           12 :             return true;
    1660              : 
    1661            0 :         default:
    1662            0 :             elog(ERROR, "invalid tuplesort state");
    1663              :             return false;       /* keep compiler quiet */
    1664              :     }
    1665              : }
    1666              : 
    1667              : /*
    1668              :  * tuplesort_merge_order - report merge order we'll use for given memory
    1669              :  * (note: "merge order" just means the number of input tapes in the merge).
    1670              :  *
    1671              :  * This is exported for use by the planner.  allowedMem is in bytes.
    1672              :  */
    1673              : int
    1674        10295 : tuplesort_merge_order(int64 allowedMem)
    1675              : {
    1676              :     int         mOrder;
    1677              : 
    1678              :     /*----------
    1679              :      * In the merge phase, we need buffer space for each input and output tape.
    1680              :      * Each pass in the balanced merge algorithm reads from M input tapes, and
    1681              :      * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
    1682              :      * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
    1683              :      * input tape.
    1684              :      *
    1685              :      * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
    1686              :      *            N * TAPE_BUFFER_OVERHEAD
    1687              :      *
    1688              :      * Except for the last and next-to-last merge passes, where there can be
    1689              :      * fewer tapes left to process, M = N.  We choose M so that we have the
    1690              :      * desired amount of memory available for the input buffers
    1691              :      * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
    1692              :      * available for the tape buffers (allowedMem).
    1693              :      *
    1694              :      * Note: you might be thinking we need to account for the memtuples[]
    1695              :      * array in this calculation, but we effectively treat that as part of the
    1696              :      * MERGE_BUFFER_SIZE workspace.
    1697              :      *----------
    1698              :      */
    1699        10295 :     mOrder = allowedMem /
    1700              :         (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
    1701              : 
    1702              :     /*
    1703              :      * Even in minimum memory, use at least a MINORDER merge.  On the other
    1704              :      * hand, even when we have lots of memory, do not use more than a MAXORDER
    1705              :      * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
    1706              :      * additional tape reduces the amount of memory available to build runs,
    1707              :      * which in turn can cause the same sort to need more runs, which makes
    1708              :      * merging slower even if it can still be done in a single pass.  Also,
    1709              :      * high order merges are quite slow due to CPU cache effects; it can be
    1710              :      * faster to pay the I/O cost of a multi-pass merge than to perform a
    1711              :      * single merge pass across many hundreds of tapes.
    1712              :      */
    1713        10295 :     mOrder = Max(mOrder, MINORDER);
    1714        10295 :     mOrder = Min(mOrder, MAXORDER);
    1715              : 
    1716        10295 :     return mOrder;
    1717              : }
    1718              : 
    1719              : /*
    1720              :  * Helper function to calculate how much memory to allocate for the read buffer
    1721              :  * of each input tape in a merge pass.
    1722              :  *
    1723              :  * 'avail_mem' is the amount of memory available for the buffers of all the
    1724              :  *      tapes, both input and output.
    1725              :  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
    1726              :  * 'maxOutputTapes' is the max. number of output tapes we should produce.
    1727              :  */
    1728              : static int64
    1729          182 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
    1730              :                        int maxOutputTapes)
    1731              : {
    1732              :     int         nOutputRuns;
    1733              :     int         nOutputTapes;
    1734              : 
    1735              :     /*
    1736              :      * How many output tapes will we produce in this pass?
    1737              :      *
    1738              :      * This is nInputRuns / nInputTapes, rounded up.
    1739              :      */
    1740          182 :     nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
    1741              : 
    1742          182 :     nOutputTapes = Min(nOutputRuns, maxOutputTapes);
    1743              : 
    1744              :     /*
    1745              :      * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
    1746              :      * remaining memory is divided evenly between the input tapes.
    1747              :      *
    1748              :      * This also follows from the formula in tuplesort_merge_order, but here
    1749              :      * we derive the input buffer size from the amount of memory available,
    1750              :      * and M and N.
    1751              :      */
    1752          182 :     return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
    1753              : }
    1754              : 
    1755              : /*
    1756              :  * inittapes - initialize for tape sorting.
    1757              :  *
    1758              :  * This is called only if we have found we won't sort in memory.
    1759              :  */
    1760              : static void
    1761          352 : inittapes(Tuplesortstate *state, bool mergeruns)
    1762              : {
    1763              :     Assert(!LEADER(state));
    1764              : 
    1765          352 :     if (mergeruns)
    1766              :     {
    1767              :         /* Compute number of input tapes to use when merging */
    1768           70 :         state->maxTapes = tuplesort_merge_order(state->allowedMem);
    1769              :     }
    1770              :     else
    1771              :     {
    1772              :         /* Workers can sometimes produce single run, output without merge */
    1773              :         Assert(WORKER(state));
    1774          282 :         state->maxTapes = MINORDER;
    1775              :     }
    1776              : 
    1777          352 :     if (trace_sort)
    1778            0 :         elog(LOG, "worker %d switching to external sort with %d tapes: %s",
    1779              :              state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
    1780              : 
    1781              :     /* Create the tape set */
    1782          352 :     inittapestate(state, state->maxTapes);
    1783          352 :     state->tapeset =
    1784          352 :         LogicalTapeSetCreate(false,
    1785          352 :                              state->shared ? &state->shared->fileset : NULL,
    1786              :                              state->worker);
    1787              : 
    1788          352 :     state->currentRun = 0;
    1789              : 
    1790              :     /*
    1791              :      * Initialize logical tape arrays.
    1792              :      */
    1793          352 :     state->inputTapes = NULL;
    1794          352 :     state->nInputTapes = 0;
    1795          352 :     state->nInputRuns = 0;
    1796              : 
    1797          352 :     state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
    1798          352 :     state->nOutputTapes = 0;
    1799          352 :     state->nOutputRuns = 0;
    1800              : 
    1801          352 :     state->status = TSS_BUILDRUNS;
    1802              : 
    1803          352 :     selectnewtape(state);
    1804          352 : }
    1805              : 
    1806              : /*
    1807              :  * inittapestate - initialize generic tape management state
    1808              :  */
    1809              : static void
    1810          449 : inittapestate(Tuplesortstate *state, int maxTapes)
    1811              : {
    1812              :     int64       tapeSpace;
    1813              : 
    1814              :     /*
    1815              :      * Decrease availMem to reflect the space needed for tape buffers; but
    1816              :      * don't decrease it to the point that we have no room for tuples. (That
    1817              :      * case is only likely to occur if sorting pass-by-value Datums; in all
    1818              :      * other scenarios the memtuples[] array is unlikely to occupy more than
    1819              :      * half of allowedMem.  In the pass-by-value case it's not important to
    1820              :      * account for tuple space, so we don't care if LACKMEM becomes
    1821              :      * inaccurate.)
    1822              :      */
    1823          449 :     tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
    1824              : 
    1825          449 :     if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
    1826          388 :         USEMEM(state, tapeSpace);
    1827              : 
    1828              :     /*
    1829              :      * Make sure that the temp file(s) underlying the tape set are created in
    1830              :      * suitable temp tablespaces.  For parallel sorts, this should have been
    1831              :      * called already, but it doesn't matter if it is called a second time.
    1832              :      */
    1833          449 :     PrepareTempTablespaces();
    1834          449 : }
    1835              : 
    1836              : /*
    1837              :  * selectnewtape -- select next tape to output to.
    1838              :  *
    1839              :  * This is called after finishing a run when we know another run
    1840              :  * must be started.  This is used both when building the initial
    1841              :  * runs, and during merge passes.
    1842              :  */
    1843              : static void
    1844          904 : selectnewtape(Tuplesortstate *state)
    1845              : {
    1846              :     /*
    1847              :      * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
    1848              :      * both zero.  On each call, we create a new output tape to hold the next
    1849              :      * run, until maxTapes is reached.  After that, we assign new runs to the
    1850              :      * existing tapes in a round robin fashion.
    1851              :      */
    1852          904 :     if (state->nOutputTapes < state->maxTapes)
    1853              :     {
    1854              :         /* Create a new tape to hold the next run */
    1855              :         Assert(state->outputTapes[state->nOutputRuns] == NULL);
    1856              :         Assert(state->nOutputRuns == state->nOutputTapes);
    1857          613 :         state->destTape = LogicalTapeCreate(state->tapeset);
    1858          613 :         state->outputTapes[state->nOutputTapes] = state->destTape;
    1859          613 :         state->nOutputTapes++;
    1860          613 :         state->nOutputRuns++;
    1861              :     }
    1862              :     else
    1863              :     {
    1864              :         /*
    1865              :          * We have reached the max number of tapes.  Append to an existing
    1866              :          * tape.
    1867              :          */
    1868          291 :         state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
    1869          291 :         state->nOutputRuns++;
    1870              :     }
    1871          904 : }
    1872              : 
    1873              : /*
    1874              :  * Initialize the slab allocation arena, for the given number of slots.
    1875              :  */
    1876              : static void
    1877          167 : init_slab_allocator(Tuplesortstate *state, int numSlots)
    1878              : {
    1879          167 :     if (numSlots > 0)
    1880              :     {
    1881              :         char       *p;
    1882              :         int         i;
    1883              : 
    1884          153 :         state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
    1885          153 :         state->slabMemoryEnd = state->slabMemoryBegin +
    1886          153 :             numSlots * SLAB_SLOT_SIZE;
    1887          153 :         state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
    1888          153 :         USEMEM(state, numSlots * SLAB_SLOT_SIZE);
    1889              : 
    1890          153 :         p = state->slabMemoryBegin;
    1891          585 :         for (i = 0; i < numSlots - 1; i++)
    1892              :         {
    1893          432 :             ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
    1894          432 :             p += SLAB_SLOT_SIZE;
    1895              :         }
    1896          153 :         ((SlabSlot *) p)->nextfree = NULL;
    1897              :     }
    1898              :     else
    1899              :     {
    1900           14 :         state->slabMemoryBegin = state->slabMemoryEnd = NULL;
    1901           14 :         state->slabFreeHead = NULL;
    1902              :     }
    1903          167 :     state->slabAllocatorUsed = true;
    1904          167 : }
    1905              : 
    1906              : /*
    1907              :  * mergeruns -- merge all the completed initial runs.
    1908              :  *
    1909              :  * This implements the Balanced k-Way Merge Algorithm.  All input data has
    1910              :  * already been written to initial runs on tape (see dumptuples).
    1911              :  */
    1912              : static void
    1913          167 : mergeruns(Tuplesortstate *state)
    1914              : {
    1915              :     int         tapenum;
    1916              : 
    1917              :     Assert(state->status == TSS_BUILDRUNS);
    1918              :     Assert(state->memtupcount == 0);
    1919              : 
    1920          167 :     if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    1921              :     {
    1922              :         /*
    1923              :          * If there are multiple runs to be merged, when we go to read back
    1924              :          * tuples from disk, abbreviated keys will not have been stored, and
    1925              :          * we don't care to regenerate them.  Disable abbreviation from this
    1926              :          * point on.
    1927              :          */
    1928           15 :         state->base.sortKeys->abbrev_converter = NULL;
    1929           15 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
    1930              : 
    1931              :         /* Not strictly necessary, but be tidy */
    1932           15 :         state->base.sortKeys->abbrev_abort = NULL;
    1933           15 :         state->base.sortKeys->abbrev_full_comparator = NULL;
    1934              :     }
    1935              : 
    1936              :     /*
    1937              :      * Reset tuple memory.  We've freed all the tuples that we previously
    1938              :      * allocated.  We will use the slab allocator from now on.
    1939              :      */
    1940          167 :     MemoryContextResetOnly(state->base.tuplecontext);
    1941              : 
    1942              :     /*
    1943              :      * We no longer need a large memtuples array.  (We will allocate a smaller
    1944              :      * one for the heap later.)
    1945              :      */
    1946          167 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1947          167 :     pfree(state->memtuples);
    1948          167 :     state->memtuples = NULL;
    1949              : 
    1950              :     /*
    1951              :      * Initialize the slab allocator.  We need one slab slot per input tape,
    1952              :      * for the tuples in the heap, plus one to hold the tuple last returned
    1953              :      * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
    1954              :      * however, we don't need to do allocate anything.)
    1955              :      *
    1956              :      * In a multi-pass merge, we could shrink this allocation for the last
    1957              :      * merge pass, if it has fewer tapes than previous passes, but we don't
    1958              :      * bother.
    1959              :      *
    1960              :      * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
    1961              :      * to track memory usage of individual tuples.
    1962              :      */
    1963          167 :     if (state->base.tuples)
    1964          153 :         init_slab_allocator(state, state->nOutputTapes + 1);
    1965              :     else
    1966           14 :         init_slab_allocator(state, 0);
    1967              : 
    1968              :     /*
    1969              :      * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
    1970              :      * from each input tape.
    1971              :      *
    1972              :      * We could shrink this, too, between passes in a multi-pass merge, but we
    1973              :      * don't bother.  (The initial input tapes are still in outputTapes.  The
    1974              :      * number of input tapes will not increase between passes.)
    1975              :      */
    1976          167 :     state->memtupsize = state->nOutputTapes;
    1977          334 :     state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
    1978          167 :                                                         state->nOutputTapes * sizeof(SortTuple));
    1979          167 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1980              : 
    1981              :     /*
    1982              :      * Use all the remaining memory we have available for tape buffers among
    1983              :      * all the input tapes.  At the beginning of each merge pass, we will
    1984              :      * divide this memory between the input and output tapes in the pass.
    1985              :      */
    1986          167 :     state->tape_buffer_mem = state->availMem;
    1987          167 :     USEMEM(state, state->tape_buffer_mem);
    1988          167 :     if (trace_sort)
    1989            0 :         elog(LOG, "worker %d using %zu KB of memory for tape buffers",
    1990              :              state->worker, state->tape_buffer_mem / 1024);
    1991              : 
    1992              :     for (;;)
    1993              :     {
    1994              :         /*
    1995              :          * On the first iteration, or if we have read all the runs from the
    1996              :          * input tapes in a multi-pass merge, it's time to start a new pass.
    1997              :          * Rewind all the output tapes, and make them inputs for the next
    1998              :          * pass.
    1999              :          */
    2000          236 :         if (state->nInputRuns == 0)
    2001              :         {
    2002              :             int64       input_buffer_size;
    2003              : 
    2004              :             /* Close the old, emptied, input tapes */
    2005          182 :             if (state->nInputTapes > 0)
    2006              :             {
    2007          105 :                 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2008           90 :                     LogicalTapeClose(state->inputTapes[tapenum]);
    2009           15 :                 pfree(state->inputTapes);
    2010              :             }
    2011              : 
    2012              :             /* Previous pass's outputs become next pass's inputs. */
    2013          182 :             state->inputTapes = state->outputTapes;
    2014          182 :             state->nInputTapes = state->nOutputTapes;
    2015          182 :             state->nInputRuns = state->nOutputRuns;
    2016              : 
    2017              :             /*
    2018              :              * Reset output tape variables.  The actual LogicalTapes will be
    2019              :              * created as needed, here we only allocate the array to hold
    2020              :              * them.
    2021              :              */
    2022          182 :             state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
    2023          182 :             state->nOutputTapes = 0;
    2024          182 :             state->nOutputRuns = 0;
    2025              : 
    2026              :             /*
    2027              :              * Redistribute the memory allocated for tape buffers, among the
    2028              :              * new input and output tapes.
    2029              :              */
    2030          182 :             input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
    2031              :                                                        state->nInputTapes,
    2032              :                                                        state->nInputRuns,
    2033              :                                                        state->maxTapes);
    2034              : 
    2035          182 :             if (trace_sort)
    2036            0 :                 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
    2037              :                      state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
    2038              :                      pg_rusage_show(&state->ru_start));
    2039              : 
    2040              :             /* Prepare the new input tapes for merge pass. */
    2041          714 :             for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2042          532 :                 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
    2043              : 
    2044              :             /*
    2045              :              * If there's just one run left on each input tape, then only one
    2046              :              * merge pass remains.  If we don't have to produce a materialized
    2047              :              * sorted tape, we can stop at this point and do the final merge
    2048              :              * on-the-fly.
    2049              :              */
    2050          182 :             if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
    2051          171 :                 && state->nInputRuns <= state->nInputTapes
    2052          156 :                 && !WORKER(state))
    2053              :             {
    2054              :                 /* Tell logtape.c we won't be writing anymore */
    2055          156 :                 LogicalTapeSetForgetFreeSpace(state->tapeset);
    2056              :                 /* Initialize for the final merge pass */
    2057          156 :                 beginmerge(state);
    2058          156 :                 state->status = TSS_FINALMERGE;
    2059          156 :                 return;
    2060              :             }
    2061              :         }
    2062              : 
    2063              :         /* Select an output tape */
    2064           80 :         selectnewtape(state);
    2065              : 
    2066              :         /* Merge one run from each input tape. */
    2067           80 :         mergeonerun(state);
    2068              : 
    2069              :         /*
    2070              :          * If the input tapes are empty, and we output only one output run,
    2071              :          * we're done.  The current output tape contains the final result.
    2072              :          */
    2073           80 :         if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
    2074           11 :             break;
    2075              :     }
    2076              : 
    2077              :     /*
    2078              :      * Done.  The result is on a single run on a single tape.
    2079              :      */
    2080           11 :     state->result_tape = state->outputTapes[0];
    2081           11 :     if (!WORKER(state))
    2082           11 :         LogicalTapeFreeze(state->result_tape, NULL);
    2083              :     else
    2084            0 :         worker_freeze_result_tape(state);
    2085           11 :     state->status = TSS_SORTEDONTAPE;
    2086              : 
    2087              :     /* Close all the now-empty input tapes, to release their read buffers. */
    2088           57 :     for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2089           46 :         LogicalTapeClose(state->inputTapes[tapenum]);
    2090              : }
    2091              : 
    2092              : /*
    2093              :  * Merge one run from each input tape.
    2094              :  */
    2095              : static void
    2096           80 : mergeonerun(Tuplesortstate *state)
    2097              : {
    2098              :     int         srcTapeIndex;
    2099              :     LogicalTape *srcTape;
    2100              : 
    2101              :     /*
    2102              :      * Start the merge by loading one tuple from each active source tape into
    2103              :      * the heap.
    2104              :      */
    2105           80 :     beginmerge(state);
    2106              : 
    2107              :     Assert(state->slabAllocatorUsed);
    2108              : 
    2109              :     /*
    2110              :      * Execute merge by repeatedly extracting lowest tuple in heap, writing it
    2111              :      * out, and replacing it with next tuple from same tape (if there is
    2112              :      * another one).
    2113              :      */
    2114       437796 :     while (state->memtupcount > 0)
    2115              :     {
    2116              :         SortTuple   stup;
    2117              : 
    2118              :         /* write the tuple to destTape */
    2119       437716 :         srcTapeIndex = state->memtuples[0].srctape;
    2120       437716 :         srcTape = state->inputTapes[srcTapeIndex];
    2121       437716 :         WRITETUP(state, state->destTape, &state->memtuples[0]);
    2122              : 
    2123              :         /* recycle the slot of the tuple we just wrote out, for the next read */
    2124       437716 :         if (state->memtuples[0].tuple)
    2125       367674 :             RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
    2126              : 
    2127              :         /*
    2128              :          * pull next tuple from the tape, and replace the written-out tuple in
    2129              :          * the heap with it.
    2130              :          */
    2131       437716 :         if (mergereadnext(state, srcTape, &stup))
    2132              :         {
    2133       437289 :             stup.srctape = srcTapeIndex;
    2134       437289 :             tuplesort_heap_replace_top(state, &stup);
    2135              :         }
    2136              :         else
    2137              :         {
    2138          427 :             tuplesort_heap_delete_top(state);
    2139          427 :             state->nInputRuns--;
    2140              :         }
    2141              :     }
    2142              : 
    2143              :     /*
    2144              :      * When the heap empties, we're done.  Write an end-of-run marker on the
    2145              :      * output tape.
    2146              :      */
    2147           80 :     markrunend(state->destTape);
    2148           80 : }
    2149              : 
    2150              : /*
    2151              :  * beginmerge - initialize for a merge pass
    2152              :  *
    2153              :  * Fill the merge heap with the first tuple from each input tape.
    2154              :  */
    2155              : static void
    2156          236 : beginmerge(Tuplesortstate *state)
    2157              : {
    2158              :     int         activeTapes;
    2159              :     int         srcTapeIndex;
    2160              : 
    2161              :     /* Heap should be empty here */
    2162              :     Assert(state->memtupcount == 0);
    2163              : 
    2164          236 :     activeTapes = Min(state->nInputTapes, state->nInputRuns);
    2165              : 
    2166         1059 :     for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    2167              :     {
    2168              :         SortTuple   tup;
    2169              : 
    2170          823 :         if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
    2171              :         {
    2172          667 :             tup.srctape = srcTapeIndex;
    2173          667 :             tuplesort_heap_insert(state, &tup);
    2174              :         }
    2175              :     }
    2176          236 : }
    2177              : 
    2178              : /*
    2179              :  * mergereadnext - read next tuple from one merge input tape
    2180              :  *
    2181              :  * Returns false on EOF.
    2182              :  */
    2183              : static bool
    2184      2780278 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
    2185              : {
    2186              :     unsigned int tuplen;
    2187              : 
    2188              :     /* read next tuple, if any */
    2189      2780278 :     if ((tuplen = getlen(srcTape, true)) == 0)
    2190          799 :         return false;
    2191      2779479 :     READTUP(state, stup, srcTape, tuplen);
    2192              : 
    2193      2779479 :     return true;
    2194              : }
    2195              : 
    2196              : /*
    2197              :  * dumptuples - remove tuples from memtuples and write initial run to tape
    2198              :  *
    2199              :  * When alltuples = true, dump everything currently in memory.  (This case is
    2200              :  * only used at end of input data.)
    2201              :  */
    2202              : static void
    2203       556565 : dumptuples(Tuplesortstate *state, bool alltuples)
    2204              : {
    2205              :     int         memtupwrite;
    2206              :     int         i;
    2207              : 
    2208              :     /*
    2209              :      * Nothing to do if we still fit in available memory and have array slots,
    2210              :      * unless this is the final call during initial run generation.
    2211              :      */
    2212       556565 :     if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
    2213       556093 :         !alltuples)
    2214       555741 :         return;
    2215              : 
    2216              :     /*
    2217              :      * Final call might require no sorting, in rare cases where we just so
    2218              :      * happen to have previously LACKMEM()'d at the point where exactly all
    2219              :      * remaining tuples are loaded into memory, just before input was
    2220              :      * exhausted.  In general, short final runs are quite possible, but avoid
    2221              :      * creating a completely empty run.  In a worker, though, we must produce
    2222              :      * at least one tape, even if it's empty.
    2223              :      */
    2224          824 :     if (state->memtupcount == 0 && state->currentRun > 0)
    2225            0 :         return;
    2226              : 
    2227              :     Assert(state->status == TSS_BUILDRUNS);
    2228              : 
    2229              :     /*
    2230              :      * It seems unlikely that this limit will ever be exceeded, but take no
    2231              :      * chances
    2232              :      */
    2233          824 :     if (state->currentRun == INT_MAX)
    2234            0 :         ereport(ERROR,
    2235              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2236              :                  errmsg("cannot have more than %d runs for an external sort",
    2237              :                         INT_MAX)));
    2238              : 
    2239          824 :     if (state->currentRun > 0)
    2240          472 :         selectnewtape(state);
    2241              : 
    2242          824 :     state->currentRun++;
    2243              : 
    2244          824 :     if (trace_sort)
    2245            0 :         elog(LOG, "worker %d starting quicksort of run %d: %s",
    2246              :              state->worker, state->currentRun,
    2247              :              pg_rusage_show(&state->ru_start));
    2248              : 
    2249              :     /*
    2250              :      * Sort all tuples accumulated within the allowed amount of memory for
    2251              :      * this run.
    2252              :      */
    2253          824 :     tuplesort_sort_memtuples(state);
    2254              : 
    2255          824 :     if (trace_sort)
    2256            0 :         elog(LOG, "worker %d finished quicksort of run %d: %s",
    2257              :              state->worker, state->currentRun,
    2258              :              pg_rusage_show(&state->ru_start));
    2259              : 
    2260          824 :     memtupwrite = state->memtupcount;
    2261      2600189 :     for (i = 0; i < memtupwrite; i++)
    2262              :     {
    2263      2599365 :         SortTuple  *stup = &state->memtuples[i];
    2264              : 
    2265      2599365 :         WRITETUP(state, state->destTape, stup);
    2266              :     }
    2267              : 
    2268          824 :     state->memtupcount = 0;
    2269              : 
    2270              :     /*
    2271              :      * Reset tuple memory.  We've freed all of the tuples that we previously
    2272              :      * allocated.  It's important to avoid fragmentation when there is a stark
    2273              :      * change in the sizes of incoming tuples.  In bounded sorts,
    2274              :      * fragmentation due to AllocSetFree's bucketing by size class might be
    2275              :      * particularly bad if this step wasn't taken.
    2276              :      */
    2277          824 :     MemoryContextReset(state->base.tuplecontext);
    2278              : 
    2279              :     /*
    2280              :      * Now update the memory accounting to subtract the memory used by the
    2281              :      * tuple.
    2282              :      */
    2283          824 :     FREEMEM(state, state->tupleMem);
    2284          824 :     state->tupleMem = 0;
    2285              : 
    2286          824 :     markrunend(state->destTape);
    2287              : 
    2288          824 :     if (trace_sort)
    2289            0 :         elog(LOG, "worker %d finished writing run %d to tape %d: %s",
    2290              :              state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
    2291              :              pg_rusage_show(&state->ru_start));
    2292              : }
    2293              : 
    2294              : /*
    2295              :  * tuplesort_rescan     - rewind and replay the scan
    2296              :  */
    2297              : void
    2298           24 : tuplesort_rescan(Tuplesortstate *state)
    2299              : {
    2300           24 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2301              : 
    2302              :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2303              : 
    2304           24 :     switch (state->status)
    2305              :     {
    2306           20 :         case TSS_SORTEDINMEM:
    2307           20 :             state->current = 0;
    2308           20 :             state->eof_reached = false;
    2309           20 :             state->markpos_offset = 0;
    2310           20 :             state->markpos_eof = false;
    2311           20 :             break;
    2312            4 :         case TSS_SORTEDONTAPE:
    2313            4 :             LogicalTapeRewindForRead(state->result_tape, 0);
    2314            4 :             state->eof_reached = false;
    2315            4 :             state->markpos_block = 0L;
    2316            4 :             state->markpos_offset = 0;
    2317            4 :             state->markpos_eof = false;
    2318            4 :             break;
    2319            0 :         default:
    2320            0 :             elog(ERROR, "invalid tuplesort state");
    2321              :             break;
    2322              :     }
    2323              : 
    2324           24 :     MemoryContextSwitchTo(oldcontext);
    2325           24 : }
    2326              : 
    2327              : /*
    2328              :  * tuplesort_markpos    - saves current position in the merged sort file
    2329              :  */
    2330              : void
    2331       292960 : tuplesort_markpos(Tuplesortstate *state)
    2332              : {
    2333       292960 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2334              : 
    2335              :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2336              : 
    2337       292960 :     switch (state->status)
    2338              :     {
    2339       288556 :         case TSS_SORTEDINMEM:
    2340       288556 :             state->markpos_offset = state->current;
    2341       288556 :             state->markpos_eof = state->eof_reached;
    2342       288556 :             break;
    2343         4404 :         case TSS_SORTEDONTAPE:
    2344         4404 :             LogicalTapeTell(state->result_tape,
    2345              :                             &state->markpos_block,
    2346              :                             &state->markpos_offset);
    2347         4404 :             state->markpos_eof = state->eof_reached;
    2348         4404 :             break;
    2349            0 :         default:
    2350            0 :             elog(ERROR, "invalid tuplesort state");
    2351              :             break;
    2352              :     }
    2353              : 
    2354       292960 :     MemoryContextSwitchTo(oldcontext);
    2355       292960 : }
    2356              : 
    2357              : /*
    2358              :  * tuplesort_restorepos - restores current position in merged sort file to
    2359              :  *                        last saved position
    2360              :  */
    2361              : void
    2362        18841 : tuplesort_restorepos(Tuplesortstate *state)
    2363              : {
    2364        18841 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2365              : 
    2366              :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2367              : 
    2368        18841 :     switch (state->status)
    2369              :     {
    2370        15745 :         case TSS_SORTEDINMEM:
    2371        15745 :             state->current = state->markpos_offset;
    2372        15745 :             state->eof_reached = state->markpos_eof;
    2373        15745 :             break;
    2374         3096 :         case TSS_SORTEDONTAPE:
    2375         3096 :             LogicalTapeSeek(state->result_tape,
    2376              :                             state->markpos_block,
    2377              :                             state->markpos_offset);
    2378         3096 :             state->eof_reached = state->markpos_eof;
    2379         3096 :             break;
    2380            0 :         default:
    2381            0 :             elog(ERROR, "invalid tuplesort state");
    2382              :             break;
    2383              :     }
    2384              : 
    2385        18841 :     MemoryContextSwitchTo(oldcontext);
    2386        18841 : }
    2387              : 
    2388              : /*
    2389              :  * tuplesort_get_stats - extract summary statistics
    2390              :  *
    2391              :  * This can be called after tuplesort_performsort() finishes to obtain
    2392              :  * printable summary information about how the sort was performed.
    2393              :  */
    2394              : void
    2395          198 : tuplesort_get_stats(Tuplesortstate *state,
    2396              :                     TuplesortInstrumentation *stats)
    2397              : {
    2398              :     /*
    2399              :      * Note: it might seem we should provide both memory and disk usage for a
    2400              :      * disk-based sort.  However, the current code doesn't track memory space
    2401              :      * accurately once we have begun to return tuples to the caller (since we
    2402              :      * don't account for pfree's the caller is expected to do), so we cannot
    2403              :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    2404              :      * to fix.  Is it worth creating an API for the memory context code to
    2405              :      * tell us how much is actually used in sortcontext?
    2406              :      */
    2407          198 :     tuplesort_updatemax(state);
    2408              : 
    2409          198 :     if (state->isMaxSpaceDisk)
    2410            3 :         stats->spaceType = SORT_SPACE_TYPE_DISK;
    2411              :     else
    2412          195 :         stats->spaceType = SORT_SPACE_TYPE_MEMORY;
    2413          198 :     stats->spaceUsed = (state->maxSpace + 1023) / 1024;
    2414              : 
    2415          198 :     switch (state->maxSpaceStatus)
    2416              :     {
    2417          195 :         case TSS_SORTEDINMEM:
    2418          195 :             if (state->boundUsed)
    2419           21 :                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
    2420              :             else
    2421          174 :                 stats->sortMethod = SORT_TYPE_QUICKSORT;
    2422          195 :             break;
    2423            0 :         case TSS_SORTEDONTAPE:
    2424            0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
    2425            0 :             break;
    2426            3 :         case TSS_FINALMERGE:
    2427            3 :             stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
    2428            3 :             break;
    2429            0 :         default:
    2430            0 :             stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
    2431            0 :             break;
    2432              :     }
    2433          198 : }
    2434              : 
    2435              : /*
    2436              :  * Convert TuplesortMethod to a string.
    2437              :  */
    2438              : const char *
    2439          147 : tuplesort_method_name(TuplesortMethod m)
    2440              : {
    2441          147 :     switch (m)
    2442              :     {
    2443            0 :         case SORT_TYPE_STILL_IN_PROGRESS:
    2444            0 :             return "still in progress";
    2445           21 :         case SORT_TYPE_TOP_N_HEAPSORT:
    2446           21 :             return "top-N heapsort";
    2447          123 :         case SORT_TYPE_QUICKSORT:
    2448          123 :             return "quicksort";
    2449            0 :         case SORT_TYPE_EXTERNAL_SORT:
    2450            0 :             return "external sort";
    2451            3 :         case SORT_TYPE_EXTERNAL_MERGE:
    2452            3 :             return "external merge";
    2453              :     }
    2454              : 
    2455            0 :     return "unknown";
    2456              : }
    2457              : 
    2458              : /*
    2459              :  * Convert TuplesortSpaceType to a string.
    2460              :  */
    2461              : const char *
    2462          129 : tuplesort_space_type_name(TuplesortSpaceType t)
    2463              : {
    2464              :     Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
    2465          129 :     return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
    2466              : }
    2467              : 
    2468              : 
    2469              : /*
    2470              :  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
    2471              :  */
    2472              : 
    2473              : /*
    2474              :  * Convert the existing unordered array of SortTuples to a bounded heap,
    2475              :  * discarding all but the smallest "state->bound" tuples.
    2476              :  *
    2477              :  * When working with a bounded heap, we want to keep the largest entry
    2478              :  * at the root (array entry zero), instead of the smallest as in the normal
    2479              :  * sort case.  This allows us to discard the largest entry cheaply.
    2480              :  * Therefore, we temporarily reverse the sort direction.
    2481              :  */
    2482              : static void
    2483          208 : make_bounded_heap(Tuplesortstate *state)
    2484              : {
    2485          208 :     int         tupcount = state->memtupcount;
    2486              :     int         i;
    2487              : 
    2488              :     Assert(state->status == TSS_INITIAL);
    2489              :     Assert(state->bounded);
    2490              :     Assert(tupcount >= state->bound);
    2491              :     Assert(SERIAL(state));
    2492              : 
    2493              :     /* Reverse sort direction so largest entry will be at root */
    2494          208 :     reversedirection(state);
    2495              : 
    2496          208 :     state->memtupcount = 0;      /* make the heap empty */
    2497        19018 :     for (i = 0; i < tupcount; i++)
    2498              :     {
    2499        18810 :         if (state->memtupcount < state->bound)
    2500              :         {
    2501              :             /* Insert next tuple into heap */
    2502              :             /* Must copy source tuple to avoid possible overwrite */
    2503         9301 :             SortTuple   stup = state->memtuples[i];
    2504              : 
    2505         9301 :             tuplesort_heap_insert(state, &stup);
    2506              :         }
    2507              :         else
    2508              :         {
    2509              :             /*
    2510              :              * The heap is full.  Replace the largest entry with the new
    2511              :              * tuple, or just discard it, if it's larger than anything already
    2512              :              * in the heap.
    2513              :              */
    2514         9509 :             if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
    2515              :             {
    2516         4902 :                 free_sort_tuple(state, &state->memtuples[i]);
    2517         4902 :                 CHECK_FOR_INTERRUPTS();
    2518              :             }
    2519              :             else
    2520         4607 :                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
    2521              :         }
    2522              :     }
    2523              : 
    2524              :     Assert(state->memtupcount == state->bound);
    2525          208 :     state->status = TSS_BOUNDED;
    2526          208 : }
    2527              : 
    2528              : /*
    2529              :  * Convert the bounded heap to a properly-sorted array
    2530              :  */
    2531              : static void
    2532          208 : sort_bounded_heap(Tuplesortstate *state)
    2533              : {
    2534          208 :     int         tupcount = state->memtupcount;
    2535              : 
    2536              :     Assert(state->status == TSS_BOUNDED);
    2537              :     Assert(state->bounded);
    2538              :     Assert(tupcount == state->bound);
    2539              :     Assert(SERIAL(state));
    2540              : 
    2541              :     /*
    2542              :      * We can unheapify in place because each delete-top call will remove the
    2543              :      * largest entry, which we can promptly store in the newly freed slot at
    2544              :      * the end.  Once we're down to a single-entry heap, we're done.
    2545              :      */
    2546         9301 :     while (state->memtupcount > 1)
    2547              :     {
    2548         9093 :         SortTuple   stup = state->memtuples[0];
    2549              : 
    2550              :         /* this sifts-up the next-largest entry and decreases memtupcount */
    2551         9093 :         tuplesort_heap_delete_top(state);
    2552         9093 :         state->memtuples[state->memtupcount] = stup;
    2553              :     }
    2554          208 :     state->memtupcount = tupcount;
    2555              : 
    2556              :     /*
    2557              :      * Reverse sort direction back to the original state.  This is not
    2558              :      * actually necessary but seems like a good idea for tidiness.
    2559              :      */
    2560          208 :     reversedirection(state);
    2561              : 
    2562          208 :     state->status = TSS_SORTEDINMEM;
    2563          208 :     state->boundUsed = true;
    2564          208 : }
    2565              : 
    2566              : 
    2567              : /* radix sort routines */
    2568              : 
    2569              : /*
    2570              :  * Retrieve byte from datum, indexed by 'level': 0 for MSB, 7 for LSB
    2571              :  */
    2572              : static inline uint8
    2573     27671507 : current_byte(Datum key, int level)
    2574              : {
    2575     27671507 :     int         shift = (sizeof(Datum) - 1 - level) * BITS_PER_BYTE;
    2576              : 
    2577     27671507 :     return (key >> shift) & 0xFF;
    2578              : }
    2579              : 
    2580              : /*
    2581              :  * Normalize datum such that unsigned comparison is order-preserving,
    2582              :  * taking ASC/DESC into account as well.
    2583              :  */
    2584              : static inline Datum
    2585     27671507 : normalize_datum(Datum orig, SortSupport ssup)
    2586              : {
    2587              :     Datum       norm_datum1;
    2588              : 
    2589     27671507 :     if (ssup->comparator == ssup_datum_signed_cmp)
    2590              :     {
    2591      2073340 :         norm_datum1 = orig + ((uint64) PG_INT64_MAX) + 1;
    2592              :     }
    2593     25598167 :     else if (ssup->comparator == ssup_datum_int32_cmp)
    2594              :     {
    2595              :         /*
    2596              :          * First truncate to uint32. Technically, we don't need to do this,
    2597              :          * but it forces the upper half of the datum to be zero regardless of
    2598              :          * sign.
    2599              :          */
    2600     19507753 :         uint32      u32 = DatumGetUInt32(orig) + ((uint32) PG_INT32_MAX) + 1;
    2601              : 
    2602     19507753 :         norm_datum1 = UInt32GetDatum(u32);
    2603              :     }
    2604              :     else
    2605              :     {
    2606              :         Assert(ssup->comparator == ssup_datum_unsigned_cmp);
    2607      6090414 :         norm_datum1 = orig;
    2608              :     }
    2609              : 
    2610     27671507 :     if (ssup->ssup_reverse)
    2611      1617873 :         norm_datum1 = ~norm_datum1;
    2612              : 
    2613     27671507 :     return norm_datum1;
    2614              : }
    2615              : 
    2616              : /*
    2617              :  * radix_sort_recursive
    2618              :  *
    2619              :  * Radix sort by (pass-by-value) datum1, diverting to qsort_tuple()
    2620              :  * for tiebreaks.
    2621              :  *
    2622              :  * This is a modification of
    2623              :  * ska_byte_sort() from https://github.com/skarupke/ska_sort
    2624              :  * The original copyright notice follows:
    2625              :  *
    2626              :  * Copyright Malte Skarupke 2016.
    2627              :  * Distributed under the Boost Software License, Version 1.0.
    2628              :  *
    2629              :  * Boost Software License - Version 1.0 - August 17th, 2003
    2630              :  *
    2631              :  * Permission is hereby granted, free of charge, to any person or organization
    2632              :  * obtaining a copy of the software and accompanying documentation covered by
    2633              :  * this license (the "Software") to use, reproduce, display, distribute,
    2634              :  * execute, and transmit the Software, and to prepare derivative works of the
    2635              :  * Software, and to permit third-parties to whom the Software is furnished to
    2636              :  * do so, all subject to the following:
    2637              :  *
    2638              :  * The copyright notices in the Software and this entire statement, including
    2639              :  * the above license grant, this restriction and the following disclaimer,
    2640              :  * must be included in all copies of the Software, in whole or in part, and
    2641              :  * all derivative works of the Software, unless such copies or derivative
    2642              :  * works are solely in the form of machine-executable object code generated by
    2643              :  * a source language processor.
    2644              :  *
    2645              :  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    2646              :  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    2647              :  * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
    2648              :  * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
    2649              :  * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
    2650              :  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
    2651              :  * DEALINGS IN THE SOFTWARE.
    2652              :  */
    2653              : static void
    2654        32281 : radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
    2655              : {
    2656        32281 :     RadixSortInfo partitions[256] = {0};
    2657              :     uint8       remaining_partitions[256];
    2658        32281 :     size_t      total = 0;
    2659        32281 :     int         num_partitions = 0;
    2660              :     int         num_remaining;
    2661        32281 :     SortSupport ssup = &state->base.sortKeys[0];
    2662        32281 :     size_t      start_offset = 0;
    2663        32281 :     SortTuple  *partition_begin = begin;
    2664              : 
    2665              :     /* count number of occurrences of each byte */
    2666     27703788 :     for (SortTuple *st = begin; st < begin + n_elems; st++)
    2667              :     {
    2668              :         uint8       this_partition;
    2669              : 
    2670              :         /* extract the byte for this level from the normalized datum */
    2671     27671507 :         this_partition = current_byte(normalize_datum(st->datum1, ssup),
    2672              :                                       level);
    2673              : 
    2674              :         /* save it for the permutation step */
    2675     27671507 :         st->curbyte = this_partition;
    2676              : 
    2677     27671507 :         partitions[this_partition].count++;
    2678              : 
    2679     27671507 :         CHECK_FOR_INTERRUPTS();
    2680              :     }
    2681              : 
    2682              :     /* compute partition offsets */
    2683      8296217 :     for (int i = 0; i < 256; i++)
    2684              :     {
    2685      8263936 :         size_t      count = partitions[i].count;
    2686              : 
    2687      8263936 :         if (count != 0)
    2688              :         {
    2689      1457233 :             partitions[i].offset = total;
    2690      1457233 :             total += count;
    2691      1457233 :             remaining_partitions[num_partitions] = i;
    2692      1457233 :             num_partitions++;
    2693              :         }
    2694      8263936 :         partitions[i].next_offset = total;
    2695              :     }
    2696              : 
    2697              :     /*
    2698              :      * Swap tuples to correct partition.
    2699              :      *
    2700              :      * In traditional American flag sort, a swap sends the current element to
    2701              :      * the correct partition, but the array pointer only advances if the
    2702              :      * partner of the swap happens to be an element that belongs in the
    2703              :      * current partition. That only requires one pass through the array, but
    2704              :      * the disadvantage is we don't know if the pointer can advance until the
    2705              :      * swap completes. Here lies the most interesting innovation from the
    2706              :      * upstream ska_byte_sort: After initiating the swap, we immediately
    2707              :      * proceed to the next element. This makes better use of CPU pipelining,
    2708              :      * but also means that we will often need multiple iterations of this
    2709              :      * loop. ska_byte_sort() maintains a separate list of which partitions
    2710              :      * haven't finished, which is updated every loop iteration. Here we simply
    2711              :      * check each partition during every iteration.
    2712              :      *
    2713              :      * If we started with a single partition, there is nothing to do. If a
    2714              :      * previous loop iteration results in only one partition that hasn't been
    2715              :      * counted as sorted, we know it's actually sorted and can exit the loop.
    2716              :      */
    2717        32281 :     num_remaining = num_partitions;
    2718        99761 :     while (num_remaining > 1)
    2719              :     {
    2720              :         /* start the count over */
    2721        67480 :         num_remaining = num_partitions;
    2722              : 
    2723      6006244 :         for (int i = 0; i < num_partitions; i++)
    2724              :         {
    2725      5938764 :             uint8       idx = remaining_partitions[i];
    2726              : 
    2727      5938764 :             for (SortTuple *st = begin + partitions[idx].offset;
    2728     13948678 :                  st < begin + partitions[idx].next_offset;
    2729      8009914 :                  st++)
    2730              :             {
    2731      8009914 :                 size_t      offset = partitions[st->curbyte].offset++;
    2732              :                 SortTuple   tmp;
    2733              : 
    2734              :                 /* swap current tuple with destination position */
    2735              :                 Assert(offset < n_elems);
    2736      8009914 :                 tmp = *st;
    2737      8009914 :                 *st = begin[offset];
    2738      8009914 :                 begin[offset] = tmp;
    2739              : 
    2740      8009914 :                 CHECK_FOR_INTERRUPTS();
    2741              :             };
    2742              : 
    2743              :             /* Is this partition sorted? */
    2744      5938764 :             if (partitions[idx].offset == partitions[idx].next_offset)
    2745      4528730 :                 num_remaining--;
    2746              :         }
    2747              :     }
    2748              : 
    2749              :     /* recurse */
    2750        32281 :     for (uint8 *rp = remaining_partitions;
    2751      1489514 :          rp < remaining_partitions + num_partitions;
    2752      1457233 :          rp++)
    2753              :     {
    2754      1457233 :         size_t      end_offset = partitions[*rp].next_offset;
    2755      1457233 :         SortTuple  *partition_end = begin + end_offset;
    2756      1457233 :         size_t      num_elements = end_offset - start_offset;
    2757              : 
    2758      1457233 :         if (num_elements > 1)
    2759              :         {
    2760       575215 :             if (level < sizeof(Datum) - 1)
    2761              :             {
    2762       491867 :                 if (num_elements < QSORT_THRESHOLD)
    2763              :                 {
    2764       461278 :                     qsort_tuple(partition_begin,
    2765              :                                 num_elements,
    2766              :                                 state->base.comparetup,
    2767              :                                 state);
    2768              :                 }
    2769              :                 else
    2770              :                 {
    2771        30589 :                     radix_sort_recursive(partition_begin,
    2772              :                                          num_elements,
    2773              :                                          level + 1,
    2774              :                                          state);
    2775              :                 }
    2776              :             }
    2777        83348 :             else if (state->base.onlyKey == NULL)
    2778              :             {
    2779              :                 /*
    2780              :                  * We've finished radix sort on all bytes of the pass-by-value
    2781              :                  * datum (possibly abbreviated), now sort using the tiebreak
    2782              :                  * comparator.
    2783              :                  */
    2784        18143 :                 qsort_tuple(partition_begin,
    2785              :                             num_elements,
    2786              :                             state->base.comparetup_tiebreak,
    2787              :                             state);
    2788              :             }
    2789              :         }
    2790              : 
    2791      1457233 :         start_offset = end_offset;
    2792      1457233 :         partition_begin = partition_end;
    2793              :     }
    2794        32281 : }
    2795              : 
    2796              : /*
    2797              :  * Entry point for radix_sort_recursive
    2798              :  *
    2799              :  * Partition tuples by isnull1, then sort both partitions, using
    2800              :  * radix sort on the NOT NULL partition if it's large enough.
    2801              :  */
    2802              : static void
    2803         2858 : radix_sort_tuple(SortTuple *data, size_t n, Tuplesortstate *state)
    2804              : {
    2805         2858 :     bool        nulls_first = state->base.sortKeys[0].ssup_nulls_first;
    2806              :     SortTuple  *null_start;
    2807              :     SortTuple  *not_null_start;
    2808         2858 :     size_t      d1 = 0,
    2809              :                 d2,
    2810              :                 null_count,
    2811              :                 not_null_count;
    2812              : 
    2813              :     /*
    2814              :      * Find the first NOT NULL if NULLS FIRST, or first NULL if NULLS LAST.
    2815              :      * This also serves as a quick check for the common case where all tuples
    2816              :      * are NOT NULL in the first sort key.
    2817              :      */
    2818      7314428 :     while (d1 < n && data[d1].isnull1 == nulls_first)
    2819              :     {
    2820      7311570 :         d1++;
    2821      7311570 :         CHECK_FOR_INTERRUPTS();
    2822              :     }
    2823              : 
    2824              :     /*
    2825              :      * If we have more than one tuple left after the quick check, partition
    2826              :      * the remainder using branchless cyclic permutation, based on
    2827              :      * https://orlp.net/blog/branchless-lomuto-partitioning/
    2828              :      */
    2829              :     Assert(n > 0);
    2830         2858 :     if (d1 < n - 1)
    2831              :     {
    2832          578 :         size_t      i = d1,
    2833          578 :                     j = d1;
    2834          578 :         SortTuple   tmp = data[d1]; /* create gap at front */
    2835              : 
    2836       341703 :         while (j < n - 1)
    2837              :         {
    2838              :             /* gap is at j, move i's element to gap */
    2839       341125 :             data[j] = data[i];
    2840              :             /* advance j to the first unknown element */
    2841       341125 :             j += 1;
    2842              :             /* move the first unknown element back to i */
    2843       341125 :             data[i] = data[j];
    2844              :             /* advance i if this element belongs in the left partition */
    2845       341125 :             i += (data[i].isnull1 == nulls_first);
    2846              : 
    2847       341125 :             CHECK_FOR_INTERRUPTS();
    2848              :         }
    2849              : 
    2850              :         /* place gap between left and right partitions */
    2851          578 :         data[j] = data[i];
    2852              :         /* restore the saved element */
    2853          578 :         data[i] = tmp;
    2854              :         /* assign it to the correct partition */
    2855          578 :         i += (data[i].isnull1 == nulls_first);
    2856              : 
    2857              :         /* d1 is now the number of elements in the left partition */
    2858          578 :         d1 = i;
    2859              :     }
    2860              : 
    2861         2858 :     d2 = n - d1;
    2862              : 
    2863              :     /* set pointers and counts for each partition */
    2864         2858 :     if (nulls_first)
    2865              :     {
    2866          495 :         null_start = data;
    2867          495 :         null_count = d1;
    2868          495 :         not_null_start = data + d1;
    2869          495 :         not_null_count = d2;
    2870              :     }
    2871              :     else
    2872              :     {
    2873         2363 :         not_null_start = data;
    2874         2363 :         not_null_count = d1;
    2875         2363 :         null_start = data + d1;
    2876         2363 :         null_count = d2;
    2877              :     }
    2878              : 
    2879         2858 :     for (SortTuple *st = null_start;
    2880         5068 :          st < null_start + null_count;
    2881         2210 :          st++)
    2882              :         Assert(st->isnull1 == true);
    2883         2858 :     for (SortTuple *st = not_null_start;
    2884      7653924 :          st < not_null_start + not_null_count;
    2885      7651066 :          st++)
    2886              :         Assert(st->isnull1 == false);
    2887              : 
    2888              :     /*
    2889              :      * Sort the NULL partition using tiebreak comparator, if necessary.
    2890              :      */
    2891         2858 :     if (state->base.onlyKey == NULL && null_count > 1)
    2892              :     {
    2893           87 :         qsort_tuple(null_start,
    2894              :                     null_count,
    2895              :                     state->base.comparetup_tiebreak,
    2896              :                     state);
    2897              :     }
    2898              : 
    2899              :     /*
    2900              :      * Sort the NOT NULL partition, using radix sort if large enough,
    2901              :      * otherwise fall back to quicksort.
    2902              :      */
    2903         2858 :     if (not_null_count < QSORT_THRESHOLD)
    2904              :     {
    2905           14 :         qsort_tuple(not_null_start,
    2906              :                     not_null_count,
    2907              :                     state->base.comparetup,
    2908              :                     state);
    2909              :     }
    2910              :     else
    2911              :     {
    2912         2844 :         bool        presorted = true;
    2913              : 
    2914         2844 :         for (SortTuple *st = not_null_start + 1;
    2915      3805561 :              st < not_null_start + not_null_count;
    2916      3802717 :              st++)
    2917              :         {
    2918      3804409 :             if (COMPARETUP(state, st - 1, st) > 0)
    2919              :             {
    2920         1692 :                 presorted = false;
    2921         1692 :                 break;
    2922              :             }
    2923              : 
    2924      3802717 :             CHECK_FOR_INTERRUPTS();
    2925              :         }
    2926              : 
    2927         2844 :         if (presorted)
    2928         1152 :             return;
    2929              :         else
    2930              :         {
    2931         1692 :             radix_sort_recursive(not_null_start,
    2932              :                                  not_null_count,
    2933              :                                  0,
    2934              :                                  state);
    2935              :         }
    2936              :     }
    2937              : }
    2938              : 
    2939              : /* Verify in-memory sort using standard comparator. */
    2940              : static void
    2941         2858 : verify_memtuples_sorted(Tuplesortstate *state)
    2942              : {
    2943              : #ifdef USE_ASSERT_CHECKING
    2944              :     for (SortTuple *st = state->memtuples + 1;
    2945              :          st < state->memtuples + state->memtupcount;
    2946              :          st++)
    2947              :         Assert(COMPARETUP(state, st - 1, st) <= 0);
    2948              : #endif
    2949         2858 : }
    2950              : 
    2951              : /*
    2952              :  * Sort all memtuples using specialized routines.
    2953              :  *
    2954              :  * Quicksort or radix sort is used for small in-memory sorts,
    2955              :  * and external sort runs.
    2956              :  */
    2957              : static void
    2958       121675 : tuplesort_sort_memtuples(Tuplesortstate *state)
    2959              : {
    2960              :     Assert(!LEADER(state));
    2961              : 
    2962       121675 :     if (state->memtupcount > 1)
    2963              :     {
    2964              :         /*
    2965              :          * Do we have the leading column's value or abbreviation in datum1?
    2966              :          */
    2967        35174 :         if (state->base.haveDatum1 && state->base.sortKeys)
    2968              :         {
    2969        35117 :             SortSupport ssup = &state->base.sortKeys[0];
    2970              : 
    2971              :             /* Does it compare as an integer? */
    2972        35117 :             if (state->memtupcount >= QSORT_THRESHOLD &&
    2973         7334 :                 (ssup->comparator == ssup_datum_unsigned_cmp ||
    2974         6927 :                  ssup->comparator == ssup_datum_signed_cmp ||
    2975         6746 :                  ssup->comparator == ssup_datum_int32_cmp))
    2976              :             {
    2977         2858 :                 radix_sort_tuple(state->memtuples,
    2978         2858 :                                  state->memtupcount,
    2979              :                                  state);
    2980         2858 :                 verify_memtuples_sorted(state);
    2981         2858 :                 return;
    2982              :             }
    2983              :         }
    2984              : 
    2985              :         /* Can we use the single-key sort function? */
    2986        32316 :         if (state->base.onlyKey != NULL)
    2987              :         {
    2988        21254 :             qsort_ssup(state->memtuples, state->memtupcount,
    2989        21254 :                        state->base.onlyKey);
    2990              :         }
    2991              :         else
    2992              :         {
    2993        11062 :             qsort_tuple(state->memtuples,
    2994        11062 :                         state->memtupcount,
    2995              :                         state->base.comparetup,
    2996              :                         state);
    2997              :         }
    2998              :     }
    2999              : }
    3000              : 
    3001              : /*
    3002              :  * Insert a new tuple into an empty or existing heap, maintaining the
    3003              :  * heap invariant.  Caller is responsible for ensuring there's room.
    3004              :  *
    3005              :  * Note: For some callers, tuple points to a memtuples[] entry above the
    3006              :  * end of the heap.  This is safe as long as it's not immediately adjacent
    3007              :  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
    3008              :  * is, it might get overwritten before being moved into the heap!
    3009              :  */
    3010              : static void
    3011         9968 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
    3012              : {
    3013              :     SortTuple  *memtuples;
    3014              :     int         j;
    3015              : 
    3016         9968 :     memtuples = state->memtuples;
    3017              :     Assert(state->memtupcount < state->memtupsize);
    3018              : 
    3019         9968 :     CHECK_FOR_INTERRUPTS();
    3020              : 
    3021              :     /*
    3022              :      * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
    3023              :      * using 1-based array indexes, not 0-based.
    3024              :      */
    3025         9968 :     j = state->memtupcount++;
    3026        28913 :     while (j > 0)
    3027              :     {
    3028        25485 :         int         i = (j - 1) >> 1;
    3029              : 
    3030        25485 :         if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
    3031         6540 :             break;
    3032        18945 :         memtuples[j] = memtuples[i];
    3033        18945 :         j = i;
    3034              :     }
    3035         9968 :     memtuples[j] = *tuple;
    3036         9968 : }
    3037              : 
    3038              : /*
    3039              :  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
    3040              :  * memtupcount, and sift up to maintain the heap invariant.
    3041              :  *
    3042              :  * The caller has already free'd the tuple the top node points to,
    3043              :  * if necessary.
    3044              :  */
    3045              : static void
    3046         9736 : tuplesort_heap_delete_top(Tuplesortstate *state)
    3047              : {
    3048         9736 :     SortTuple  *memtuples = state->memtuples;
    3049              :     SortTuple  *tuple;
    3050              : 
    3051         9736 :     if (--state->memtupcount <= 0)
    3052          166 :         return;
    3053              : 
    3054              :     /*
    3055              :      * Remove the last tuple in the heap, and re-insert it, by replacing the
    3056              :      * current top node with it.
    3057              :      */
    3058         9570 :     tuple = &memtuples[state->memtupcount];
    3059         9570 :     tuplesort_heap_replace_top(state, tuple);
    3060              : }
    3061              : 
    3062              : /*
    3063              :  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
    3064              :  * maintain the heap invariant.
    3065              :  *
    3066              :  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
    3067              :  * Heapsort, steps H3-H8).
    3068              :  */
    3069              : static void
    3070      3044519 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
    3071              : {
    3072      3044519 :     SortTuple  *memtuples = state->memtuples;
    3073              :     unsigned int i,
    3074              :                 n;
    3075              : 
    3076              :     Assert(state->memtupcount >= 1);
    3077              : 
    3078      3044519 :     CHECK_FOR_INTERRUPTS();
    3079              : 
    3080              :     /*
    3081              :      * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
    3082              :      * This prevents overflow in the "2 * i + 1" calculation, since at the top
    3083              :      * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
    3084              :      */
    3085      3044519 :     n = state->memtupcount;
    3086      3044519 :     i = 0;                      /* i is where the "hole" is */
    3087              :     for (;;)
    3088       873919 :     {
    3089      3918438 :         unsigned int j = 2 * i + 1;
    3090              : 
    3091      3918438 :         if (j >= n)
    3092       575677 :             break;
    3093      4569068 :         if (j + 1 < n &&
    3094      1226307 :             COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
    3095       484509 :             j++;
    3096      3342761 :         if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
    3097      2468842 :             break;
    3098       873919 :         memtuples[i] = memtuples[j];
    3099       873919 :         i = j;
    3100              :     }
    3101      3044519 :     memtuples[i] = *tuple;
    3102      3044519 : }
    3103              : 
    3104              : /*
    3105              :  * Function to reverse the sort direction from its current state
    3106              :  *
    3107              :  * It is not safe to call this when performing hash tuplesorts
    3108              :  */
    3109              : static void
    3110          416 : reversedirection(Tuplesortstate *state)
    3111              : {
    3112          416 :     SortSupport sortKey = state->base.sortKeys;
    3113              :     int         nkey;
    3114              : 
    3115         1012 :     for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
    3116              :     {
    3117          596 :         sortKey->ssup_reverse = !sortKey->ssup_reverse;
    3118          596 :         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    3119              :     }
    3120          416 : }
    3121              : 
    3122              : 
    3123              : /*
    3124              :  * Tape interface routines
    3125              :  */
    3126              : 
    3127              : static unsigned int
    3128      2931784 : getlen(LogicalTape *tape, bool eofOK)
    3129              : {
    3130              :     unsigned int len;
    3131              : 
    3132      2931784 :     if (LogicalTapeRead(tape,
    3133              :                         &len, sizeof(len)) != sizeof(len))
    3134            0 :         elog(ERROR, "unexpected end of tape");
    3135      2931784 :     if (len == 0 && !eofOK)
    3136            0 :         elog(ERROR, "unexpected end of data");
    3137      2931784 :     return len;
    3138              : }
    3139              : 
    3140              : static void
    3141          904 : markrunend(LogicalTape *tape)
    3142              : {
    3143          904 :     unsigned int len = 0;
    3144              : 
    3145          904 :     LogicalTapeWrite(tape, &len, sizeof(len));
    3146          904 : }
    3147              : 
    3148              : /*
    3149              :  * Get memory for tuple from within READTUP() routine.
    3150              :  *
    3151              :  * We use next free slot from the slab allocator, or palloc() if the tuple
    3152              :  * is too large for that.
    3153              :  */
    3154              : void *
    3155      2725847 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
    3156              : {
    3157              :     SlabSlot   *buf;
    3158              : 
    3159              :     /*
    3160              :      * We pre-allocate enough slots in the slab arena that we should never run
    3161              :      * out.
    3162              :      */
    3163              :     Assert(state->slabFreeHead);
    3164              : 
    3165      2725847 :     if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
    3166            3 :         return MemoryContextAlloc(state->base.sortcontext, tuplen);
    3167              :     else
    3168              :     {
    3169      2725844 :         buf = state->slabFreeHead;
    3170              :         /* Reuse this slot */
    3171      2725844 :         state->slabFreeHead = buf->nextfree;
    3172              : 
    3173      2725844 :         return buf;
    3174              :     }
    3175              : }
    3176              : 
    3177              : 
    3178              : /*
    3179              :  * Parallel sort routines
    3180              :  */
    3181              : 
    3182              : /*
    3183              :  * tuplesort_estimate_shared - estimate required shared memory allocation
    3184              :  *
    3185              :  * nWorkers is an estimate of the number of workers (it's the number that
    3186              :  * will be requested).
    3187              :  */
    3188              : Size
    3189           98 : tuplesort_estimate_shared(int nWorkers)
    3190              : {
    3191              :     Size        tapesSize;
    3192              : 
    3193              :     Assert(nWorkers > 0);
    3194              : 
    3195              :     /* Make sure that BufFile shared state is MAXALIGN'd */
    3196           98 :     tapesSize = mul_size(sizeof(TapeShare), nWorkers);
    3197           98 :     tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
    3198              : 
    3199           98 :     return tapesSize;
    3200              : }
    3201              : 
    3202              : /*
    3203              :  * tuplesort_initialize_shared - initialize shared tuplesort state
    3204              :  *
    3205              :  * Must be called from leader process before workers are launched, to
    3206              :  * establish state needed up-front for worker tuplesortstates.  nWorkers
    3207              :  * should match the argument passed to tuplesort_estimate_shared().
    3208              :  */
    3209              : void
    3210          133 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
    3211              : {
    3212              :     int         i;
    3213              : 
    3214              :     Assert(nWorkers > 0);
    3215              : 
    3216          133 :     SpinLockInit(&shared->mutex);
    3217          133 :     shared->currentWorker = 0;
    3218          133 :     shared->workersFinished = 0;
    3219          133 :     SharedFileSetInit(&shared->fileset, seg);
    3220          133 :     shared->nTapes = nWorkers;
    3221          419 :     for (i = 0; i < nWorkers; i++)
    3222              :     {
    3223          286 :         shared->tapes[i].firstblocknumber = 0L;
    3224              :     }
    3225          133 : }
    3226              : 
    3227              : /*
    3228              :  * tuplesort_attach_shared - attach to shared tuplesort state
    3229              :  *
    3230              :  * Must be called by all worker processes.
    3231              :  */
    3232              : void
    3233          150 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
    3234              : {
    3235              :     /* Attach to SharedFileSet */
    3236          150 :     SharedFileSetAttach(&shared->fileset, seg);
    3237          150 : }
    3238              : 
    3239              : /*
    3240              :  * worker_get_identifier - Assign and return ordinal identifier for worker
    3241              :  *
    3242              :  * The order in which these are assigned is not well defined, and should not
    3243              :  * matter; worker numbers across parallel sort participants need only be
    3244              :  * distinct and gapless.  logtape.c requires this.
    3245              :  *
    3246              :  * Note that the identifiers assigned from here have no relation to
    3247              :  * ParallelWorkerNumber number, to avoid making any assumption about
    3248              :  * caller's requirements.  However, we do follow the ParallelWorkerNumber
    3249              :  * convention of representing a non-worker with worker number -1.  This
    3250              :  * includes the leader, as well as serial Tuplesort processes.
    3251              :  */
    3252              : static int
    3253          282 : worker_get_identifier(Tuplesortstate *state)
    3254              : {
    3255          282 :     Sharedsort *shared = state->shared;
    3256              :     int         worker;
    3257              : 
    3258              :     Assert(WORKER(state));
    3259              : 
    3260          282 :     SpinLockAcquire(&shared->mutex);
    3261          282 :     worker = shared->currentWorker++;
    3262          282 :     SpinLockRelease(&shared->mutex);
    3263              : 
    3264          282 :     return worker;
    3265              : }
    3266              : 
    3267              : /*
    3268              :  * worker_freeze_result_tape - freeze worker's result tape for leader
    3269              :  *
    3270              :  * This is called by workers just after the result tape has been determined,
    3271              :  * instead of calling LogicalTapeFreeze() directly.  They do so because
    3272              :  * workers require a few additional steps over similar serial
    3273              :  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
    3274              :  * steps are around freeing now unneeded resources, and representing to
    3275              :  * leader that worker's input run is available for its merge.
    3276              :  *
    3277              :  * There should only be one final output run for each worker, which consists
    3278              :  * of all tuples that were originally input into worker.
    3279              :  */
    3280              : static void
    3281          282 : worker_freeze_result_tape(Tuplesortstate *state)
    3282              : {
    3283          282 :     Sharedsort *shared = state->shared;
    3284              :     TapeShare   output;
    3285              : 
    3286              :     Assert(WORKER(state));
    3287              :     Assert(state->result_tape != NULL);
    3288              :     Assert(state->memtupcount == 0);
    3289              : 
    3290              :     /*
    3291              :      * Free most remaining memory, in case caller is sensitive to our holding
    3292              :      * on to it.  memtuples may not be a tiny merge heap at this point.
    3293              :      */
    3294          282 :     pfree(state->memtuples);
    3295              :     /* Be tidy */
    3296          282 :     state->memtuples = NULL;
    3297          282 :     state->memtupsize = 0;
    3298              : 
    3299              :     /*
    3300              :      * Parallel worker requires result tape metadata, which is to be stored in
    3301              :      * shared memory for leader
    3302              :      */
    3303          282 :     LogicalTapeFreeze(state->result_tape, &output);
    3304              : 
    3305              :     /* Store properties of output tape, and update finished worker count */
    3306          282 :     SpinLockAcquire(&shared->mutex);
    3307          282 :     shared->tapes[state->worker] = output;
    3308          282 :     shared->workersFinished++;
    3309          282 :     SpinLockRelease(&shared->mutex);
    3310          282 : }
    3311              : 
    3312              : /*
    3313              :  * worker_nomergeruns - dump memtuples in worker, without merging
    3314              :  *
    3315              :  * This called as an alternative to mergeruns() with a worker when no
    3316              :  * merging is required.
    3317              :  */
    3318              : static void
    3319          282 : worker_nomergeruns(Tuplesortstate *state)
    3320              : {
    3321              :     Assert(WORKER(state));
    3322              :     Assert(state->result_tape == NULL);
    3323              :     Assert(state->nOutputRuns == 1);
    3324              : 
    3325          282 :     state->result_tape = state->destTape;
    3326          282 :     worker_freeze_result_tape(state);
    3327          282 : }
    3328              : 
    3329              : /*
    3330              :  * leader_takeover_tapes - create tapeset for leader from worker tapes
    3331              :  *
    3332              :  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
    3333              :  * sorting has occurred in workers, all of which must have already returned
    3334              :  * from tuplesort_performsort().
    3335              :  *
    3336              :  * When this returns, leader process is left in a state that is virtually
    3337              :  * indistinguishable from it having generated runs as a serial external sort
    3338              :  * might have.
    3339              :  */
    3340              : static void
    3341           97 : leader_takeover_tapes(Tuplesortstate *state)
    3342              : {
    3343           97 :     Sharedsort *shared = state->shared;
    3344           97 :     int         nParticipants = state->nParticipants;
    3345              :     int         workersFinished;
    3346              :     int         j;
    3347              : 
    3348              :     Assert(LEADER(state));
    3349              :     Assert(nParticipants >= 1);
    3350              : 
    3351           97 :     SpinLockAcquire(&shared->mutex);
    3352           97 :     workersFinished = shared->workersFinished;
    3353           97 :     SpinLockRelease(&shared->mutex);
    3354              : 
    3355           97 :     if (nParticipants != workersFinished)
    3356            0 :         elog(ERROR, "cannot take over tapes before all workers finish");
    3357              : 
    3358              :     /*
    3359              :      * Create the tapeset from worker tapes, including a leader-owned tape at
    3360              :      * the end.  Parallel workers are far more expensive than logical tapes,
    3361              :      * so the number of tapes allocated here should never be excessive.
    3362              :      */
    3363           97 :     inittapestate(state, nParticipants);
    3364           97 :     state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
    3365              : 
    3366              :     /*
    3367              :      * Set currentRun to reflect the number of runs we will merge (it's not
    3368              :      * used for anything, this is just pro forma)
    3369              :      */
    3370           97 :     state->currentRun = nParticipants;
    3371              : 
    3372              :     /*
    3373              :      * Initialize the state to look the same as after building the initial
    3374              :      * runs.
    3375              :      *
    3376              :      * There will always be exactly 1 run per worker, and exactly one input
    3377              :      * tape per run, because workers always output exactly 1 run, even when
    3378              :      * there were no input tuples for workers to sort.
    3379              :      */
    3380           97 :     state->inputTapes = NULL;
    3381           97 :     state->nInputTapes = 0;
    3382           97 :     state->nInputRuns = 0;
    3383              : 
    3384           97 :     state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
    3385           97 :     state->nOutputTapes = nParticipants;
    3386           97 :     state->nOutputRuns = nParticipants;
    3387              : 
    3388          309 :     for (j = 0; j < nParticipants; j++)
    3389              :     {
    3390          212 :         state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    3391              :     }
    3392              : 
    3393           97 :     state->status = TSS_BUILDRUNS;
    3394           97 : }
    3395              : 
    3396              : /*
    3397              :  * Convenience routine to free a tuple previously loaded into sort memory
    3398              :  */
    3399              : static void
    3400      1879974 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
    3401              : {
    3402      1879974 :     if (stup->tuple)
    3403              :     {
    3404      1799848 :         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    3405      1799848 :         pfree(stup->tuple);
    3406      1799848 :         stup->tuple = NULL;
    3407              :     }
    3408      1879974 : }
    3409              : 
    3410              : int
    3411      1995287 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
    3412              : {
    3413      1995287 :     if (x < y)
    3414       682720 :         return -1;
    3415      1312567 :     else if (x > y)
    3416       523138 :         return 1;
    3417              :     else
    3418       789429 :         return 0;
    3419              : }
    3420              : 
    3421              : int
    3422      1388845 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
    3423              : {
    3424      1388845 :     int64       xx = DatumGetInt64(x);
    3425      1388845 :     int64       yy = DatumGetInt64(y);
    3426              : 
    3427      1388845 :     if (xx < yy)
    3428      1028830 :         return -1;
    3429       360015 :     else if (xx > yy)
    3430       180449 :         return 1;
    3431              :     else
    3432       179566 :         return 0;
    3433              : }
    3434              : 
    3435              : int
    3436     91546889 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
    3437              : {
    3438     91546889 :     int32       xx = DatumGetInt32(x);
    3439     91546889 :     int32       yy = DatumGetInt32(y);
    3440              : 
    3441     91546889 :     if (xx < yy)
    3442     23648694 :         return -1;
    3443     67898195 :     else if (xx > yy)
    3444     20331801 :         return 1;
    3445              :     else
    3446     47566394 :         return 0;
    3447              : }
        

Generated by: LCOV version 2.0-1