LCOV - code coverage report
Current view: top level - src/backend/utils/sort - tuplesort.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 91.6 % 912 835
Test Date: 2026-04-03 01:15:53 Functions: 100.0 % 57 57
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * tuplesort.c
       4              :  *    Generalized tuple sorting routines.
       5              :  *
       6              :  * This module provides a generalized facility for tuple sorting, which can be
       7              :  * applied to different kinds of sortable objects.  Implementation of
       8              :  * the particular sorting variants is given in tuplesortvariants.c.
       9              :  * This module works efficiently for both small and large amounts
      10              :  * of data.  Small amounts are sorted in-memory.  Large amounts are
      11              :  * sorted using temporary files and a standard external sort
      12              :  * algorithm.
      13              :  *
      14              :  * See Knuth, volume 3, for more than you want to know about external
      15              :  * sorting algorithms.  The algorithm we use is a balanced k-way merge.
      16              :  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
      17              :  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
      18              :  * merge is better.  Knuth is assuming that tape drives are expensive
      19              :  * beasts, and in particular that there will always be many more runs than
      20              :  * tape drives.  The polyphase merge algorithm was good at keeping all the
      21              :  * tape drives busy, but in our implementation a "tape drive" doesn't cost
      22              :  * much more than a few Kb of memory buffers, so we can afford to have
      23              :  * lots of them.  In particular, if we can have as many tape drives as
      24              :  * sorted runs, we can eliminate any repeated I/O at all.
      25              :  *
      26              :  * Historically, we divided the input into sorted runs using replacement
      27              :  * selection, in the form of a priority tree implemented as a heap
      28              :  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
      29              :  * or radix sort for run generation.
      30              :  *
      31              :  * The approximate amount of memory allowed for any one sort operation
      32              :  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
      33              :  * we absorb tuples and simply store them in an unsorted array as long as
      34              :  * we haven't exceeded workMem.  If we reach the end of the input without
      35              :  * exceeding workMem, we sort the array in memory and subsequently return
      36              :  * tuples just by scanning the tuple array sequentially.  If we do exceed
      37              :  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
      38              :  * When tuples are dumped in batch after in-memory sorting, we begin a new run
      39              :  * with a new output tape.  If we reach the max number of tapes, we write
      40              :  * subsequent runs on the existing tapes in a round-robin fashion.  We will
      41              :  * need multiple merge passes to finish the merge in that case.  After the
      42              :  * end of the input is reached, we dump out remaining tuples in memory into
      43              :  * a final run, then merge the runs.
      44              :  *
      45              :  * When merging runs, we use a heap containing just the frontmost tuple from
      46              :  * each source run; we repeatedly output the smallest tuple and replace it
      47              :  * with the next tuple from its source tape (if any).  When the heap empties,
      48              :  * the merge is complete.  The basic merge algorithm thus needs very little
      49              :  * memory --- only M tuples for an M-way merge, and M is constrained to a
      50              :  * small number.  However, we can still make good use of our full workMem
      51              :  * allocation by pre-reading additional blocks from each source tape.  Without
      52              :  * prereading, our access pattern to the temporary file would be very erratic;
      53              :  * on average we'd read one block from each of M source tapes during the same
      54              :  * time that we're writing M blocks to the output tape, so there is no
      55              :  * sequentiality of access at all, defeating the read-ahead methods used by
      56              :  * most Unix kernels.  Worse, the output tape gets written into a very random
      57              :  * sequence of blocks of the temp file, ensuring that things will be even
      58              :  * worse when it comes time to read that tape.  A straightforward merge pass
      59              :  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
      60              :  * by prereading from each source tape sequentially, loading about workMem/M
      61              :  * bytes from each tape in turn, and making the sequential blocks immediately
      62              :  * available for reuse.  This approach helps to localize both read and write
      63              :  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
      64              :  * much memory to use for the buffers.
      65              :  *
      66              :  * In the current code we determine the number of input tapes M on the basis
      67              :  * of workMem: we want workMem/M to be large enough that we read a fair
      68              :  * amount of data each time we read from a tape, so as to maintain the
      69              :  * locality of access described above.  Nonetheless, with large workMem we
      70              :  * can have many tapes.  The logical "tapes" are implemented by logtape.c,
      71              :  * which avoids space wastage by recycling disk space as soon as each block
      72              :  * is read from its "tape".
      73              :  *
      74              :  * When the caller requests random access to the sort result, we form
      75              :  * the final sorted run on a logical tape which is then "frozen", so
      76              :  * that we can access it randomly.  When the caller does not need random
      77              :  * access, we return from tuplesort_performsort() as soon as we are down
      78              :  * to one run per logical tape.  The final merge is then performed
      79              :  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
      80              :  * saves one cycle of writing all the data out to disk and reading it in.
      81              :  *
      82              :  * This module supports parallel sorting.  Parallel sorts involve coordination
      83              :  * among one or more worker processes, and a leader process, each with its own
      84              :  * tuplesort state.  The leader process (or, more accurately, the
      85              :  * Tuplesortstate associated with a leader process) creates a full tapeset
      86              :  * consisting of worker tapes with one run to merge; a run for every
      87              :  * worker process.  This is then merged.  Worker processes are guaranteed to
      88              :  * produce exactly one output run from their partial input.
      89              :  *
      90              :  *
      91              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      92              :  * Portions Copyright (c) 1994, Regents of the University of California
      93              :  *
      94              :  * IDENTIFICATION
      95              :  *    src/backend/utils/sort/tuplesort.c
      96              :  *
      97              :  *-------------------------------------------------------------------------
      98              :  */
      99              : 
     100              : #include "postgres.h"
     101              : 
     102              : #include <limits.h>
     103              : 
     104              : #include "commands/tablespace.h"
     105              : #include "miscadmin.h"
     106              : #include "pg_trace.h"
     107              : #include "port/pg_bitutils.h"
     108              : #include "storage/shmem.h"
     109              : #include "utils/guc.h"
     110              : #include "utils/memutils.h"
     111              : #include "utils/pg_rusage.h"
     112              : #include "utils/tuplesort.h"
     113              : 
     114              : /*
     115              :  * Initial size of memtuples array.  This must be more than
     116              :  * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples().  Clamp at
     117              :  * 1024 elements to avoid excessive reallocs.
     118              :  */
     119              : #define INITIAL_MEMTUPSIZE Max(1024, \
     120              :     ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
     121              : 
     122              : /* GUC variables */
     123              : bool        trace_sort = false;
     124              : 
     125              : #ifdef DEBUG_BOUNDED_SORT
     126              : bool        optimize_bounded_sort = true;
     127              : #endif
     128              : 
     129              : 
     130              : /*
     131              :  * During merge, we use a pre-allocated set of fixed-size slots to hold
     132              :  * tuples.  To avoid palloc/pfree overhead.
     133              :  *
     134              :  * Merge doesn't require a lot of memory, so we can afford to waste some,
     135              :  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
     136              :  * palloc() overhead is not significant anymore.
     137              :  *
     138              :  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
     139              :  * slot holds a tuple.
     140              :  */
     141              : #define SLAB_SLOT_SIZE 1024
     142              : 
     143              : typedef union SlabSlot
     144              : {
     145              :     union SlabSlot *nextfree;
     146              :     char        buffer[SLAB_SLOT_SIZE];
     147              : } SlabSlot;
     148              : 
     149              : /*
     150              :  * Possible states of a Tuplesort object.  These denote the states that
     151              :  * persist between calls of Tuplesort routines.
     152              :  */
     153              : typedef enum
     154              : {
     155              :     TSS_INITIAL,                /* Loading tuples; still within memory limit */
     156              :     TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
     157              :     TSS_BUILDRUNS,              /* Loading tuples; writing to tape */
     158              :     TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
     159              :     TSS_SORTEDONTAPE,           /* Sort completed, final run is on tape */
     160              :     TSS_FINALMERGE,             /* Performing final merge on-the-fly */
     161              : } TupSortStatus;
     162              : 
     163              : /*
     164              :  * Parameters for calculation of number of tapes to use --- see inittapes()
     165              :  * and tuplesort_merge_order().
     166              :  *
     167              :  * In this calculation we assume that each tape will cost us about 1 blocks
     168              :  * worth of buffer space.  This ignores the overhead of all the other data
     169              :  * structures needed for each tape, but it's probably close enough.
     170              :  *
     171              :  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
     172              :  * input tape, for pre-reading (see discussion at top of file).  This is *in
     173              :  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
     174              :  */
     175              : #define MINORDER        6       /* minimum merge order */
     176              : #define MAXORDER        500     /* maximum merge order */
     177              : #define TAPE_BUFFER_OVERHEAD        BLCKSZ
     178              : #define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
     179              : 
     180              : 
     181              : /*
     182              :  * Private state of a Tuplesort operation.
     183              :  */
     184              : struct Tuplesortstate
     185              : {
     186              :     TuplesortPublic base;
     187              :     TupSortStatus status;       /* enumerated value as shown above */
     188              :     bool        bounded;        /* did caller specify a maximum number of
     189              :                                  * tuples to return? */
     190              :     bool        boundUsed;      /* true if we made use of a bounded heap */
     191              :     int         bound;          /* if bounded, the maximum number of tuples */
     192              :     int64       tupleMem;       /* memory consumed by individual tuples.
     193              :                                  * storing this separately from what we track
     194              :                                  * in availMem allows us to subtract the
     195              :                                  * memory consumed by all tuples when dumping
     196              :                                  * tuples to tape */
     197              :     int64       availMem;       /* remaining memory available, in bytes */
     198              :     int64       allowedMem;     /* total memory allowed, in bytes */
     199              :     int         maxTapes;       /* max number of input tapes to merge in each
     200              :                                  * pass */
     201              :     int64       maxSpace;       /* maximum amount of space occupied among sort
     202              :                                  * of groups, either in-memory or on-disk */
     203              :     bool        isMaxSpaceDisk; /* true when maxSpace tracks on-disk space,
     204              :                                  * false means in-memory */
     205              :     TupSortStatus maxSpaceStatus;   /* sort status when maxSpace was reached */
     206              :     LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
     207              : 
     208              :     /*
     209              :      * This array holds the tuples now in sort memory.  If we are in state
     210              :      * INITIAL, the tuples are in no particular order; if we are in state
     211              :      * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     212              :      * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     213              :      * H.  In state SORTEDONTAPE, the array is not used.
     214              :      */
     215              :     SortTuple  *memtuples;      /* array of SortTuple structs */
     216              :     int         memtupcount;    /* number of tuples currently present */
     217              :     int         memtupsize;     /* allocated length of memtuples array */
     218              :     bool        growmemtuples;  /* memtuples' growth still underway? */
     219              : 
     220              :     /*
     221              :      * Memory for tuples is sometimes allocated using a simple slab allocator,
     222              :      * rather than with palloc().  Currently, we switch to slab allocation
     223              :      * when we start merging.  Merging only needs to keep a small, fixed
     224              :      * number of tuples in memory at any time, so we can avoid the
     225              :      * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     226              :      * to hold the tuples.
     227              :      *
     228              :      * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     229              :      * slots.  The allocation is sized to have one slot per tape, plus one
     230              :      * additional slot.  We need that many slots to hold all the tuples kept
     231              :      * in the heap during merge, plus the one we have last returned from the
     232              :      * sort, with tuplesort_gettuple.
     233              :      *
     234              :      * Initially, all the slots are kept in a linked list of free slots.  When
     235              :      * a tuple is read from a tape, it is put to the next available slot, if
     236              :      * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     237              :      * instead.
     238              :      *
     239              :      * When we're done processing a tuple, we return the slot back to the free
     240              :      * list, or pfree() if it was palloc'd.  We know that a tuple was
     241              :      * allocated from the slab, if its pointer value is between
     242              :      * slabMemoryBegin and -End.
     243              :      *
     244              :      * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     245              :      * tracking memory usage is not used.
     246              :      */
     247              :     bool        slabAllocatorUsed;
     248              : 
     249              :     char       *slabMemoryBegin;    /* beginning of slab memory arena */
     250              :     char       *slabMemoryEnd;  /* end of slab memory arena */
     251              :     SlabSlot   *slabFreeHead;   /* head of free list */
     252              : 
     253              :     /* Memory used for input and output tape buffers. */
     254              :     size_t      tape_buffer_mem;
     255              : 
     256              :     /*
     257              :      * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     258              :      * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     259              :      * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     260              :      * recycle the memory on next gettuple call.
     261              :      */
     262              :     void       *lastReturnedTuple;
     263              : 
     264              :     /*
     265              :      * While building initial runs, this is the current output run number.
     266              :      * Afterwards, it is the number of initial runs we made.
     267              :      */
     268              :     int         currentRun;
     269              : 
     270              :     /*
     271              :      * Logical tapes, for merging.
     272              :      *
     273              :      * The initial runs are written in the output tapes.  In each merge pass,
     274              :      * the output tapes of the previous pass become the input tapes, and new
     275              :      * output tapes are created as needed.  When nInputTapes equals
     276              :      * nInputRuns, there is only one merge pass left.
     277              :      */
     278              :     LogicalTape **inputTapes;
     279              :     int         nInputTapes;
     280              :     int         nInputRuns;
     281              : 
     282              :     LogicalTape **outputTapes;
     283              :     int         nOutputTapes;
     284              :     int         nOutputRuns;
     285              : 
     286              :     LogicalTape *destTape;      /* current output tape */
     287              : 
     288              :     /*
     289              :      * These variables are used after completion of sorting to keep track of
     290              :      * the next tuple to return.  (In the tape case, the tape's current read
     291              :      * position is also critical state.)
     292              :      */
     293              :     LogicalTape *result_tape;   /* actual tape of finished output */
     294              :     int         current;        /* array index (only used if SORTEDINMEM) */
     295              :     bool        eof_reached;    /* reached EOF (needed for cursors) */
     296              : 
     297              :     /* markpos_xxx holds marked position for mark and restore */
     298              :     int64       markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
     299              :     int         markpos_offset; /* saved "current", or offset in tape block */
     300              :     bool        markpos_eof;    /* saved "eof_reached" */
     301              : 
     302              :     /*
     303              :      * These variables are used during parallel sorting.
     304              :      *
     305              :      * worker is our worker identifier.  Follows the general convention that
     306              :      * -1 value relates to a leader tuplesort, and values >= 0 worker
     307              :      * tuplesorts. (-1 can also be a serial tuplesort.)
     308              :      *
     309              :      * shared is mutable shared memory state, which is used to coordinate
     310              :      * parallel sorts.
     311              :      *
     312              :      * nParticipants is the number of worker Tuplesortstates known by the
     313              :      * leader to have actually been launched, which implies that they must
     314              :      * finish a run that the leader needs to merge.  Typically includes a
     315              :      * worker state held by the leader process itself.  Set in the leader
     316              :      * Tuplesortstate only.
     317              :      */
     318              :     int         worker;
     319              :     Sharedsort *shared;
     320              :     int         nParticipants;
     321              : 
     322              :     /*
     323              :      * Additional state for managing "abbreviated key" sortsupport routines
     324              :      * (which currently may be used by all cases except the hash index case).
     325              :      * Tracks the intervals at which the optimization's effectiveness is
     326              :      * tested.
     327              :      */
     328              :     int64       abbrevNext;     /* Tuple # at which to next check
     329              :                                  * applicability */
     330              : 
     331              :     /*
     332              :      * Resource snapshot for time of sort start.
     333              :      */
     334              :     PGRUsage    ru_start;
     335              : };
     336              : 
     337              : /*
     338              :  * Private mutable state of tuplesort-parallel-operation.  This is allocated
     339              :  * in shared memory.
     340              :  */
     341              : struct Sharedsort
     342              : {
     343              :     /* mutex protects all fields prior to tapes */
     344              :     slock_t     mutex;
     345              : 
     346              :     /*
     347              :      * currentWorker generates ordinal identifier numbers for parallel sort
     348              :      * workers.  These start from 0, and are always gapless.
     349              :      *
     350              :      * Workers increment workersFinished to indicate having finished.  If this
     351              :      * is equal to state.nParticipants within the leader, leader is ready to
     352              :      * merge worker runs.
     353              :      */
     354              :     int         currentWorker;
     355              :     int         workersFinished;
     356              : 
     357              :     /* Temporary file space */
     358              :     SharedFileSet fileset;
     359              : 
     360              :     /* Size of tapes flexible array */
     361              :     int         nTapes;
     362              : 
     363              :     /*
     364              :      * Tapes array used by workers to report back information needed by the
     365              :      * leader to concatenate all worker tapes into one for merging
     366              :      */
     367              :     TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
     368              : };
     369              : 
     370              : /*
     371              :  * Is the given tuple allocated from the slab memory arena?
     372              :  */
     373              : #define IS_SLAB_SLOT(state, tuple) \
     374              :     ((char *) (tuple) >= (state)->slabMemoryBegin && \
     375              :      (char *) (tuple) < (state)->slabMemoryEnd)
     376              : 
     377              : /*
     378              :  * Return the given tuple to the slab memory free list, or free it
     379              :  * if it was palloc'd.
     380              :  */
     381              : #define RELEASE_SLAB_SLOT(state, tuple) \
     382              :     do { \
     383              :         SlabSlot *buf = (SlabSlot *) tuple; \
     384              :         \
     385              :         if (IS_SLAB_SLOT((state), buf)) \
     386              :         { \
     387              :             buf->nextfree = (state)->slabFreeHead; \
     388              :             (state)->slabFreeHead = buf; \
     389              :         } else \
     390              :             pfree(buf); \
     391              :     } while(0)
     392              : 
     393              : #define REMOVEABBREV(state,stup,count)  ((*(state)->base.removeabbrev) (state, stup, count))
     394              : #define COMPARETUP(state,a,b)   ((*(state)->base.comparetup) (a, b, state))
     395              : #define WRITETUP(state,tape,stup)   ((*(state)->base.writetup) (state, tape, stup))
     396              : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
     397              : #define FREESTATE(state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
     398              : #define LACKMEM(state)      ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
     399              : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     400              : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     401              : #define SERIAL(state)       ((state)->shared == NULL)
     402              : #define WORKER(state)       ((state)->shared && (state)->worker != -1)
     403              : #define LEADER(state)       ((state)->shared && (state)->worker == -1)
     404              : 
     405              : /*
     406              :  * NOTES about on-tape representation of tuples:
     407              :  *
     408              :  * We require the first "unsigned int" of a stored tuple to be the total size
     409              :  * on-tape of the tuple, including itself (so it is never zero; an all-zero
     410              :  * unsigned int is used to delimit runs).  The remainder of the stored tuple
     411              :  * may or may not match the in-memory representation of the tuple ---
     412              :  * any conversion needed is the job of the writetup and readtup routines.
     413              :  *
     414              :  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
     415              :  * representation of the tuple must be followed by another "unsigned int" that
     416              :  * is a copy of the length --- so the total tape space used is actually
     417              :  * sizeof(unsigned int) more than the stored length value.  This allows
     418              :  * read-backwards.  When the random access flag was not specified, the
     419              :  * write/read routines may omit the extra length word.
     420              :  *
     421              :  * writetup is expected to write both length words as well as the tuple
     422              :  * data.  When readtup is called, the tape is positioned just after the
     423              :  * front length word; readtup must read the tuple data and advance past
     424              :  * the back length word (if present).
     425              :  *
     426              :  * The write/read routines can make use of the tuple description data
     427              :  * stored in the Tuplesortstate record, if needed.  They are also expected
     428              :  * to adjust state->availMem by the amount of memory space (not tape space!)
     429              :  * released or consumed.  There is no error return from either writetup
     430              :  * or readtup; they should ereport() on failure.
     431              :  *
     432              :  *
     433              :  * NOTES about memory consumption calculations:
     434              :  *
     435              :  * We count space allocated for tuples against the workMem limit, plus
     436              :  * the space used by the variable-size memtuples array.  Fixed-size space
     437              :  * is not counted; it's small enough to not be interesting.
     438              :  *
     439              :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     440              :  * rather than the originally-requested size.  This is important since
     441              :  * palloc can add substantial overhead.  It's not a complete answer since
     442              :  * we won't count any wasted space in palloc allocation blocks, but it's
     443              :  * a lot better than what we were doing before 7.3.  As of 9.6, a
     444              :  * separate memory context is used for caller passed tuples.  Resetting
     445              :  * it at certain key increments significantly ameliorates fragmentation.
     446              :  * readtup routines use the slab allocator (they cannot use
     447              :  * the reset context because it gets deleted at the point that merging
     448              :  * begins).
     449              :  */
     450              : 
     451              : 
     452              : static void tuplesort_begin_batch(Tuplesortstate *state);
     453              : static bool consider_abort_common(Tuplesortstate *state);
     454              : static void inittapes(Tuplesortstate *state, bool mergeruns);
     455              : static void inittapestate(Tuplesortstate *state, int maxTapes);
     456              : static void selectnewtape(Tuplesortstate *state);
     457              : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
     458              : static void mergeruns(Tuplesortstate *state);
     459              : static void mergeonerun(Tuplesortstate *state);
     460              : static void beginmerge(Tuplesortstate *state);
     461              : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
     462              : static void dumptuples(Tuplesortstate *state, bool alltuples);
     463              : static void make_bounded_heap(Tuplesortstate *state);
     464              : static void sort_bounded_heap(Tuplesortstate *state);
     465              : static void tuplesort_sort_memtuples(Tuplesortstate *state);
     466              : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
     467              : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
     468              : static void tuplesort_heap_delete_top(Tuplesortstate *state);
     469              : static void reversedirection(Tuplesortstate *state);
     470              : static unsigned int getlen(LogicalTape *tape, bool eofOK);
     471              : static void markrunend(LogicalTape *tape);
     472              : static int  worker_get_identifier(Tuplesortstate *state);
     473              : static void worker_freeze_result_tape(Tuplesortstate *state);
     474              : static void worker_nomergeruns(Tuplesortstate *state);
     475              : static void leader_takeover_tapes(Tuplesortstate *state);
     476              : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
     477              : static void tuplesort_free(Tuplesortstate *state);
     478              : static void tuplesort_updatemax(Tuplesortstate *state);
     479              : 
     480              : 
     481              : /*
     482              :  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
     483              :  * any variant of SortTuples, using the appropriate comparetup function.
     484              :  * qsort_ssup() is specialized for the case where the comparetup function
     485              :  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
     486              :  * and Datum sorts.
     487              :  */
     488              : 
     489              : #define ST_SORT qsort_tuple
     490              : #define ST_ELEMENT_TYPE SortTuple
     491              : #define ST_COMPARE_RUNTIME_POINTER
     492              : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     493              : #define ST_CHECK_FOR_INTERRUPTS
     494              : #define ST_SCOPE static
     495              : #define ST_DECLARE
     496              : #define ST_DEFINE
     497              : #include "lib/sort_template.h"
     498              : 
     499              : #define ST_SORT qsort_ssup
     500              : #define ST_ELEMENT_TYPE SortTuple
     501              : #define ST_COMPARE(a, b, ssup) \
     502              :     ApplySortComparator((a)->datum1, (a)->isnull1, \
     503              :                         (b)->datum1, (b)->isnull1, (ssup))
     504              : #define ST_COMPARE_ARG_TYPE SortSupportData
     505              : #define ST_CHECK_FOR_INTERRUPTS
     506              : #define ST_SCOPE static
     507              : #define ST_DEFINE
     508              : #include "lib/sort_template.h"
     509              : 
     510              : /* state for radix sort */
     511              : typedef struct RadixSortInfo
     512              : {
     513              :     union
     514              :     {
     515              :         size_t      count;
     516              :         size_t      offset;
     517              :     };
     518              :     size_t      next_offset;
     519              : } RadixSortInfo;
     520              : 
     521              : /*
     522              :  * Threshold below which qsort_tuple() is generally faster than a radix sort.
     523              :  */
     524              : #define QSORT_THRESHOLD 40
     525              : 
     526              : 
     527              : /*
     528              :  *      tuplesort_begin_xxx
     529              :  *
     530              :  * Initialize for a tuple sort operation.
     531              :  *
     532              :  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
     533              :  * zero or more times, then call tuplesort_performsort when all the tuples
     534              :  * have been supplied.  After performsort, retrieve the tuples in sorted
     535              :  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
     536              :  * access was requested, rescan, markpos, and restorepos can also be called.)
     537              :  * Call tuplesort_end to terminate the operation and release memory/disk space.
     538              :  *
     539              :  * Each variant of tuplesort_begin has a workMem parameter specifying the
     540              :  * maximum number of kilobytes of RAM to use before spilling data to disk.
     541              :  * (The normal value of this parameter is work_mem, but some callers use
     542              :  * other values.)  Each variant also has a sortopt which is a bitmask of
     543              :  * sort options.  See TUPLESORT_* definitions in tuplesort.h
     544              :  */
     545              : 
     546              : Tuplesortstate *
     547       172875 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
     548              : {
     549              :     Tuplesortstate *state;
     550              :     MemoryContext maincontext;
     551              :     MemoryContext sortcontext;
     552              :     MemoryContext oldcontext;
     553              : 
     554              :     /* See leader_takeover_tapes() remarks on random access support */
     555       172875 :     if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
     556            0 :         elog(ERROR, "random access disallowed under parallel sort");
     557              : 
     558              :     /*
     559              :      * Memory context surviving tuplesort_reset.  This memory context holds
     560              :      * data which is useful to keep while sorting multiple similar batches.
     561              :      */
     562       172875 :     maincontext = AllocSetContextCreate(CurrentMemoryContext,
     563              :                                         "TupleSort main",
     564              :                                         ALLOCSET_DEFAULT_SIZES);
     565              : 
     566              :     /*
     567              :      * Create a working memory context for one sort operation.  The content of
     568              :      * this context is deleted by tuplesort_reset.
     569              :      */
     570       172875 :     sortcontext = AllocSetContextCreate(maincontext,
     571              :                                         "TupleSort sort",
     572              :                                         ALLOCSET_DEFAULT_SIZES);
     573              : 
     574              :     /*
     575              :      * Additionally a working memory context for tuples is setup in
     576              :      * tuplesort_begin_batch.
     577              :      */
     578              : 
     579              :     /*
     580              :      * Make the Tuplesortstate within the per-sortstate context.  This way, we
     581              :      * don't need a separate pfree() operation for it at shutdown.
     582              :      */
     583       172875 :     oldcontext = MemoryContextSwitchTo(maincontext);
     584              : 
     585       172875 :     state = palloc0_object(Tuplesortstate);
     586              : 
     587       172875 :     if (trace_sort)
     588            0 :         pg_rusage_init(&state->ru_start);
     589              : 
     590       172875 :     state->base.sortopt = sortopt;
     591       172875 :     state->base.tuples = true;
     592       172875 :     state->abbrevNext = 10;
     593              : 
     594              :     /*
     595              :      * workMem is forced to be at least 64KB, the current minimum valid value
     596              :      * for the work_mem GUC.  This is a defense against parallel sort callers
     597              :      * that divide out memory among many workers in a way that leaves each
     598              :      * with very little memory.
     599              :      */
     600       172875 :     state->allowedMem = Max(workMem, 64) * (int64) 1024;
     601       172875 :     state->base.sortcontext = sortcontext;
     602       172875 :     state->base.maincontext = maincontext;
     603              : 
     604       172875 :     state->memtupsize = INITIAL_MEMTUPSIZE;
     605       172875 :     state->memtuples = NULL;
     606              : 
     607              :     /*
     608              :      * After all of the other non-parallel-related state, we setup all of the
     609              :      * state needed for each batch.
     610              :      */
     611       172875 :     tuplesort_begin_batch(state);
     612              : 
     613              :     /*
     614              :      * Initialize parallel-related state based on coordination information
     615              :      * from caller
     616              :      */
     617       172875 :     if (!coordinate)
     618              :     {
     619              :         /* Serial sort */
     620       172318 :         state->shared = NULL;
     621       172318 :         state->worker = -1;
     622       172318 :         state->nParticipants = -1;
     623              :     }
     624          557 :     else if (coordinate->isWorker)
     625              :     {
     626              :         /* Parallel worker produces exactly one final run from all input */
     627          379 :         state->shared = coordinate->sharedsort;
     628          379 :         state->worker = worker_get_identifier(state);
     629          379 :         state->nParticipants = -1;
     630              :     }
     631              :     else
     632              :     {
     633              :         /* Parallel leader state only used for final merge */
     634          178 :         state->shared = coordinate->sharedsort;
     635          178 :         state->worker = -1;
     636          178 :         state->nParticipants = coordinate->nParticipants;
     637              :         Assert(state->nParticipants >= 1);
     638              :     }
     639              : 
     640       172875 :     MemoryContextSwitchTo(oldcontext);
     641              : 
     642       172875 :     return state;
     643              : }
     644              : 
     645              : /*
     646              :  *      tuplesort_begin_batch
     647              :  *
     648              :  * Setup, or reset, all state need for processing a new set of tuples with this
     649              :  * sort state. Called both from tuplesort_begin_common (the first time sorting
     650              :  * with this sort state) and tuplesort_reset (for subsequent usages).
     651              :  */
     652              : static void
     653       174776 : tuplesort_begin_batch(Tuplesortstate *state)
     654              : {
     655              :     MemoryContext oldcontext;
     656              : 
     657       174776 :     oldcontext = MemoryContextSwitchTo(state->base.maincontext);
     658              : 
     659              :     /*
     660              :      * Caller tuple (e.g. IndexTuple) memory context.
     661              :      *
     662              :      * A dedicated child context used exclusively for caller passed tuples
     663              :      * eases memory management.  Resetting at key points reduces
     664              :      * fragmentation. Note that the memtuples array of SortTuples is allocated
     665              :      * in the parent context, not this context, because there is no need to
     666              :      * free memtuples early.  For bounded sorts, tuples may be pfreed in any
     667              :      * order, so we use a regular aset.c context so that it can make use of
     668              :      * free'd memory.  When the sort is not bounded, we make use of a bump.c
     669              :      * context as this keeps allocations more compact with less wastage.
     670              :      * Allocations are also slightly more CPU efficient.
     671              :      */
     672       174776 :     if (TupleSortUseBumpTupleCxt(state->base.sortopt))
     673       173907 :         state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
     674              :                                                      "Caller tuples",
     675              :                                                      ALLOCSET_DEFAULT_SIZES);
     676              :     else
     677          869 :         state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
     678              :                                                          "Caller tuples",
     679              :                                                          ALLOCSET_DEFAULT_SIZES);
     680              : 
     681              : 
     682       174776 :     state->status = TSS_INITIAL;
     683       174776 :     state->bounded = false;
     684       174776 :     state->boundUsed = false;
     685              : 
     686       174776 :     state->availMem = state->allowedMem;
     687              : 
     688       174776 :     state->tapeset = NULL;
     689              : 
     690       174776 :     state->memtupcount = 0;
     691              : 
     692       174776 :     state->growmemtuples = true;
     693       174776 :     state->slabAllocatorUsed = false;
     694       174776 :     if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
     695              :     {
     696           48 :         pfree(state->memtuples);
     697           48 :         state->memtuples = NULL;
     698           48 :         state->memtupsize = INITIAL_MEMTUPSIZE;
     699              :     }
     700       174776 :     if (state->memtuples == NULL)
     701              :     {
     702       172923 :         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
     703       172923 :         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     704              :     }
     705              : 
     706              :     /* workMem must be large enough for the minimal memtuples array */
     707       174776 :     if (LACKMEM(state))
     708            0 :         elog(ERROR, "insufficient memory allowed for sort");
     709              : 
     710       174776 :     state->currentRun = 0;
     711              : 
     712              :     /*
     713              :      * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
     714              :      * inittapes(), if needed.
     715              :      */
     716              : 
     717       174776 :     state->result_tape = NULL;   /* flag that result tape has not been formed */
     718              : 
     719       174776 :     MemoryContextSwitchTo(oldcontext);
     720       174776 : }
     721              : 
     722              : /*
     723              :  * tuplesort_set_bound
     724              :  *
     725              :  *  Advise tuplesort that at most the first N result tuples are required.
     726              :  *
     727              :  * Must be called before inserting any tuples.  (Actually, we could allow it
     728              :  * as long as the sort hasn't spilled to disk, but there seems no need for
     729              :  * delayed calls at the moment.)
     730              :  *
     731              :  * This is a hint only. The tuplesort may still return more tuples than
     732              :  * requested.  Parallel leader tuplesorts will always ignore the hint.
     733              :  */
     734              : void
     735          780 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
     736              : {
     737              :     /* Assert we're called before loading any tuples */
     738              :     Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
     739              :     /* Assert we allow bounded sorts */
     740              :     Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
     741              :     /* Can't set the bound twice, either */
     742              :     Assert(!state->bounded);
     743              :     /* Also, this shouldn't be called in a parallel worker */
     744              :     Assert(!WORKER(state));
     745              : 
     746              :     /* Parallel leader allows but ignores hint */
     747          780 :     if (LEADER(state))
     748            0 :         return;
     749              : 
     750              : #ifdef DEBUG_BOUNDED_SORT
     751              :     /* Honor GUC setting that disables the feature (for easy testing) */
     752              :     if (!optimize_bounded_sort)
     753              :         return;
     754              : #endif
     755              : 
     756              :     /* We want to be able to compute bound * 2, so limit the setting */
     757          780 :     if (bound > (int64) (INT_MAX / 2))
     758            0 :         return;
     759              : 
     760          780 :     state->bounded = true;
     761          780 :     state->bound = (int) bound;
     762              : 
     763              :     /*
     764              :      * Bounded sorts are not an effective target for abbreviated key
     765              :      * optimization.  Disable by setting state to be consistent with no
     766              :      * abbreviation support.
     767              :      */
     768          780 :     state->base.sortKeys->abbrev_converter = NULL;
     769          780 :     if (state->base.sortKeys->abbrev_full_comparator)
     770           10 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
     771              : 
     772              :     /* Not strictly necessary, but be tidy */
     773          780 :     state->base.sortKeys->abbrev_abort = NULL;
     774          780 :     state->base.sortKeys->abbrev_full_comparator = NULL;
     775              : }
     776              : 
     777              : /*
     778              :  * tuplesort_used_bound
     779              :  *
     780              :  * Allow callers to find out if the sort state was able to use a bound.
     781              :  */
     782              : bool
     783          243 : tuplesort_used_bound(Tuplesortstate *state)
     784              : {
     785          243 :     return state->boundUsed;
     786              : }
     787              : 
     788              : /*
     789              :  * tuplesort_free
     790              :  *
     791              :  *  Internal routine for freeing resources of tuplesort.
     792              :  */
     793              : static void
     794       174602 : tuplesort_free(Tuplesortstate *state)
     795              : {
     796              :     /* context swap probably not needed, but let's be safe */
     797       174602 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
     798              :     int64       spaceUsed;
     799              : 
     800       174602 :     if (state->tapeset)
     801          599 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset);
     802              :     else
     803       174003 :         spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
     804              : 
     805              :     /*
     806              :      * Delete temporary "tape" files, if any.
     807              :      *
     808              :      * We don't bother to destroy the individual tapes here. They will go away
     809              :      * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
     810              :      * finished tapes already.)
     811              :      */
     812       174602 :     if (state->tapeset)
     813          599 :         LogicalTapeSetClose(state->tapeset);
     814              : 
     815       174602 :     if (trace_sort)
     816              :     {
     817            0 :         if (state->tapeset)
     818            0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
     819              :                  SERIAL(state) ? "external sort" : "parallel external sort",
     820              :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     821              :         else
     822            0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
     823              :                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
     824              :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     825              :     }
     826              : 
     827              :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
     828              : 
     829       174602 :     FREESTATE(state);
     830       174602 :     MemoryContextSwitchTo(oldcontext);
     831              : 
     832              :     /*
     833              :      * Free the per-sort memory context, thereby releasing all working memory.
     834              :      */
     835       174602 :     MemoryContextReset(state->base.sortcontext);
     836       174602 : }
     837              : 
     838              : /*
     839              :  * tuplesort_end
     840              :  *
     841              :  *  Release resources and clean up.
     842              :  *
     843              :  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
     844              :  * pointing to garbage.  Be careful not to attempt to use or free such
     845              :  * pointers afterwards!
     846              :  */
     847              : void
     848       172701 : tuplesort_end(Tuplesortstate *state)
     849              : {
     850       172701 :     tuplesort_free(state);
     851              : 
     852              :     /*
     853              :      * Free the main memory context, including the Tuplesortstate struct
     854              :      * itself.
     855              :      */
     856       172701 :     MemoryContextDelete(state->base.maincontext);
     857       172701 : }
     858              : 
     859              : /*
     860              :  * tuplesort_updatemax
     861              :  *
     862              :  *  Update maximum resource usage statistics.
     863              :  */
     864              : static void
     865         2165 : tuplesort_updatemax(Tuplesortstate *state)
     866              : {
     867              :     int64       spaceUsed;
     868              :     bool        isSpaceDisk;
     869              : 
     870              :     /*
     871              :      * Note: it might seem we should provide both memory and disk usage for a
     872              :      * disk-based sort.  However, the current code doesn't track memory space
     873              :      * accurately once we have begun to return tuples to the caller (since we
     874              :      * don't account for pfree's the caller is expected to do), so we cannot
     875              :      * rely on availMem in a disk sort.  This does not seem worth the overhead
     876              :      * to fix.  Is it worth creating an API for the memory context code to
     877              :      * tell us how much is actually used in sortcontext?
     878              :      */
     879         2165 :     if (state->tapeset)
     880              :     {
     881            4 :         isSpaceDisk = true;
     882            4 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
     883              :     }
     884              :     else
     885              :     {
     886         2161 :         isSpaceDisk = false;
     887         2161 :         spaceUsed = state->allowedMem - state->availMem;
     888              :     }
     889              : 
     890              :     /*
     891              :      * Sort evicts data to the disk when it wasn't able to fit that data into
     892              :      * main memory.  This is why we assume space used on the disk to be more
     893              :      * important for tracking resource usage than space used in memory. Note
     894              :      * that the amount of space occupied by some tupleset on the disk might be
     895              :      * less than amount of space occupied by the same tupleset in memory due
     896              :      * to more compact representation.
     897              :      */
     898         2165 :     if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
     899         2161 :         (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
     900              :     {
     901          325 :         state->maxSpace = spaceUsed;
     902          325 :         state->isMaxSpaceDisk = isSpaceDisk;
     903          325 :         state->maxSpaceStatus = state->status;
     904              :     }
     905         2165 : }
     906              : 
     907              : /*
     908              :  * tuplesort_reset
     909              :  *
     910              :  *  Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
     911              :  *  meta-information in.  After tuplesort_reset, tuplesort is ready to start
     912              :  *  a new sort.  This allows avoiding recreation of tuple sort states (and
     913              :  *  save resources) when sorting multiple small batches.
     914              :  */
     915              : void
     916         1901 : tuplesort_reset(Tuplesortstate *state)
     917              : {
     918         1901 :     tuplesort_updatemax(state);
     919         1901 :     tuplesort_free(state);
     920              : 
     921              :     /*
     922              :      * After we've freed up per-batch memory, re-setup all of the state common
     923              :      * to both the first batch and any subsequent batch.
     924              :      */
     925         1901 :     tuplesort_begin_batch(state);
     926              : 
     927         1901 :     state->lastReturnedTuple = NULL;
     928         1901 :     state->slabMemoryBegin = NULL;
     929         1901 :     state->slabMemoryEnd = NULL;
     930         1901 :     state->slabFreeHead = NULL;
     931         1901 : }
     932              : 
     933              : /*
     934              :  * Grow the memtuples[] array, if possible within our memory constraint.  We
     935              :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
     936              :  * limit.  Return true if we were able to enlarge the array, false if not.
     937              :  *
     938              :  * Normally, at each increment we double the size of the array.  When doing
     939              :  * that would exceed a limit, we attempt one last, smaller increase (and then
     940              :  * clear the growmemtuples flag so we don't try any more).  That allows us to
     941              :  * use memory as fully as permitted; sticking to the pure doubling rule could
     942              :  * result in almost half going unused.  Because availMem moves around with
     943              :  * tuple addition/removal, we need some rule to prevent making repeated small
     944              :  * increases in memtupsize, which would just be useless thrashing.  The
     945              :  * growmemtuples flag accomplishes that and also prevents useless
     946              :  * recalculations in this function.
     947              :  */
     948              : static bool
     949         5316 : grow_memtuples(Tuplesortstate *state)
     950              : {
     951              :     int         newmemtupsize;
     952         5316 :     int         memtupsize = state->memtupsize;
     953         5316 :     int64       memNowUsed = state->allowedMem - state->availMem;
     954              : 
     955              :     /* Forget it if we've already maxed out memtuples, per comment above */
     956         5316 :     if (!state->growmemtuples)
     957           90 :         return false;
     958              : 
     959              :     /* Select new value of memtupsize */
     960         5226 :     if (memNowUsed <= state->availMem)
     961              :     {
     962              :         /*
     963              :          * We've used no more than half of allowedMem; double our usage,
     964              :          * clamping at INT_MAX tuples.
     965              :          */
     966         5133 :         if (memtupsize < INT_MAX / 2)
     967         5133 :             newmemtupsize = memtupsize * 2;
     968              :         else
     969              :         {
     970            0 :             newmemtupsize = INT_MAX;
     971            0 :             state->growmemtuples = false;
     972              :         }
     973              :     }
     974              :     else
     975              :     {
     976              :         /*
     977              :          * This will be the last increment of memtupsize.  Abandon doubling
     978              :          * strategy and instead increase as much as we safely can.
     979              :          *
     980              :          * To stay within allowedMem, we can't increase memtupsize by more
     981              :          * than availMem / sizeof(SortTuple) elements.  In practice, we want
     982              :          * to increase it by considerably less, because we need to leave some
     983              :          * space for the tuples to which the new array slots will refer.  We
     984              :          * assume the new tuples will be about the same size as the tuples
     985              :          * we've already seen, and thus we can extrapolate from the space
     986              :          * consumption so far to estimate an appropriate new size for the
     987              :          * memtuples array.  The optimal value might be higher or lower than
     988              :          * this estimate, but it's hard to know that in advance.  We again
     989              :          * clamp at INT_MAX tuples.
     990              :          *
     991              :          * This calculation is safe against enlarging the array so much that
     992              :          * LACKMEM becomes true, because the memory currently used includes
     993              :          * the present array; thus, there would be enough allowedMem for the
     994              :          * new array elements even if no other memory were currently used.
     995              :          *
     996              :          * We do the arithmetic in float8, because otherwise the product of
     997              :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
     998              :          * result should be insignificant; but even if we computed a
     999              :          * completely insane result, the checks below will prevent anything
    1000              :          * really bad from happening.
    1001              :          */
    1002              :         double      grow_ratio;
    1003              : 
    1004           93 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
    1005           93 :         if (memtupsize * grow_ratio < INT_MAX)
    1006           93 :             newmemtupsize = (int) (memtupsize * grow_ratio);
    1007              :         else
    1008            0 :             newmemtupsize = INT_MAX;
    1009              : 
    1010              :         /* We won't make any further enlargement attempts */
    1011           93 :         state->growmemtuples = false;
    1012              :     }
    1013              : 
    1014              :     /* Must enlarge array by at least one element, else report failure */
    1015         5226 :     if (newmemtupsize <= memtupsize)
    1016            0 :         goto noalloc;
    1017              : 
    1018              :     /*
    1019              :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
    1020              :      * to ensure our request won't be rejected.  Note that we can easily
    1021              :      * exhaust address space before facing this outcome.  (This is presently
    1022              :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
    1023              :      * don't rely on that at this distance.)
    1024              :      */
    1025         5226 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    1026              :     {
    1027            0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
    1028            0 :         state->growmemtuples = false;    /* can't grow any more */
    1029              :     }
    1030              : 
    1031              :     /*
    1032              :      * We need to be sure that we do not cause LACKMEM to become true, else
    1033              :      * the space management algorithm will go nuts.  The code above should
    1034              :      * never generate a dangerous request, but to be safe, check explicitly
    1035              :      * that the array growth fits within availMem.  (We could still cause
    1036              :      * LACKMEM if the memory chunk overhead associated with the memtuples
    1037              :      * array were to increase.  That shouldn't happen because we chose the
    1038              :      * initial array size large enough to ensure that palloc will be treating
    1039              :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
    1040              :      * explicitly below just in case.)
    1041              :      */
    1042         5226 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
    1043            0 :         goto noalloc;
    1044              : 
    1045              :     /* OK, do it */
    1046         5226 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1047         5226 :     state->memtupsize = newmemtupsize;
    1048         5226 :     state->memtuples = (SortTuple *)
    1049         5226 :         repalloc_huge(state->memtuples,
    1050         5226 :                       state->memtupsize * sizeof(SortTuple));
    1051         5226 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1052         5226 :     if (LACKMEM(state))
    1053            0 :         elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    1054         5226 :     return true;
    1055              : 
    1056            0 : noalloc:
    1057              :     /* If for any reason we didn't realloc, shut off future attempts */
    1058            0 :     state->growmemtuples = false;
    1059            0 :     return false;
    1060              : }
    1061              : 
    1062              : /*
    1063              :  * Shared code for tuple and datum cases.
    1064              :  */
    1065              : void
    1066     19362619 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
    1067              :                           bool useAbbrev, Size tuplen)
    1068              : {
    1069     19362619 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1070              : 
    1071              :     Assert(!LEADER(state));
    1072              : 
    1073              :     /* account for the memory used for this tuple */
    1074     19362619 :     USEMEM(state, tuplen);
    1075     19362619 :     state->tupleMem += tuplen;
    1076              : 
    1077     19362619 :     if (!useAbbrev)
    1078              :     {
    1079              :         /*
    1080              :          * Leave ordinary Datum representation, or NULL value.  If there is a
    1081              :          * converter it won't expect NULL values, and cost model is not
    1082              :          * required to account for NULL, so in that case we avoid calling
    1083              :          * converter and just set datum1 to zeroed representation (to be
    1084              :          * consistent, and to support cheap inequality tests for NULL
    1085              :          * abbreviated keys).
    1086              :          */
    1087              :     }
    1088      2899252 :     else if (!consider_abort_common(state))
    1089              :     {
    1090              :         /* Store abbreviated key representation */
    1091      2899188 :         tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
    1092              :                                                                state->base.sortKeys);
    1093              :     }
    1094              :     else
    1095              :     {
    1096              :         /*
    1097              :          * Set state to be consistent with never trying abbreviation.
    1098              :          *
    1099              :          * Alter datum1 representation in already-copied tuples, so as to
    1100              :          * ensure a consistent representation (current tuple was just
    1101              :          * handled).  It does not matter if some dumped tuples are already
    1102              :          * sorted on tape, since serialized tuples lack abbreviated keys
    1103              :          * (TSS_BUILDRUNS state prevents control reaching here in any case).
    1104              :          */
    1105           64 :         REMOVEABBREV(state, state->memtuples, state->memtupcount);
    1106              :     }
    1107              : 
    1108     19362619 :     switch (state->status)
    1109              :     {
    1110     16489625 :         case TSS_INITIAL:
    1111              : 
    1112              :             /*
    1113              :              * Save the tuple into the unsorted array.  First, grow the array
    1114              :              * as needed.  Note that we try to grow the array when there is
    1115              :              * still one free slot remaining --- if we fail, there'll still be
    1116              :              * room to store the incoming tuple, and then we'll switch to
    1117              :              * tape-based operation.
    1118              :              */
    1119     16489625 :             if (state->memtupcount >= state->memtupsize - 1)
    1120              :             {
    1121         5316 :                 (void) grow_memtuples(state);
    1122              :                 Assert(state->memtupcount < state->memtupsize);
    1123              :             }
    1124     16489625 :             state->memtuples[state->memtupcount++] = *tuple;
    1125              : 
    1126              :             /*
    1127              :              * Check if it's time to switch over to a bounded heapsort. We do
    1128              :              * so if the input tuple count exceeds twice the desired tuple
    1129              :              * count (this is a heuristic for where heapsort becomes cheaper
    1130              :              * than a quicksort), or if we've just filled workMem and have
    1131              :              * enough tuples to meet the bound.
    1132              :              *
    1133              :              * Note that once we enter TSS_BOUNDED state we will always try to
    1134              :              * complete the sort that way.  In the worst case, if later input
    1135              :              * tuples are larger than earlier ones, this might cause us to
    1136              :              * exceed workMem significantly.
    1137              :              */
    1138     16489625 :             if (state->bounded &&
    1139        35488 :                 (state->memtupcount > state->bound * 2 ||
    1140        35233 :                  (state->memtupcount > state->bound && LACKMEM(state))))
    1141              :             {
    1142          255 :                 if (trace_sort)
    1143            0 :                     elog(LOG, "switching to bounded heapsort at %d tuples: %s",
    1144              :                          state->memtupcount,
    1145              :                          pg_rusage_show(&state->ru_start));
    1146          255 :                 make_bounded_heap(state);
    1147          255 :                 MemoryContextSwitchTo(oldcontext);
    1148          255 :                 return;
    1149              :             }
    1150              : 
    1151              :             /*
    1152              :              * Done if we still fit in available memory and have array slots.
    1153              :              */
    1154     16489370 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
    1155              :             {
    1156     16489280 :                 MemoryContextSwitchTo(oldcontext);
    1157     16489280 :                 return;
    1158              :             }
    1159              : 
    1160              :             /*
    1161              :              * Nope; time to switch to tape-based operation.
    1162              :              */
    1163           90 :             inittapes(state, true);
    1164              : 
    1165              :             /*
    1166              :              * Dump all tuples.
    1167              :              */
    1168           90 :             dumptuples(state, false);
    1169           90 :             break;
    1170              : 
    1171      2138921 :         case TSS_BOUNDED:
    1172              : 
    1173              :             /*
    1174              :              * We don't want to grow the array here, so check whether the new
    1175              :              * tuple can be discarded before putting it in.  This should be a
    1176              :              * good speed optimization, too, since when there are many more
    1177              :              * input tuples than the bound, most input tuples can be discarded
    1178              :              * with just this one comparison.  Note that because we currently
    1179              :              * have the sort direction reversed, we must check for <= not >=.
    1180              :              */
    1181      2138921 :             if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
    1182              :             {
    1183              :                 /* new tuple <= top of the heap, so we can discard it */
    1184      1804256 :                 free_sort_tuple(state, tuple);
    1185      1804256 :                 CHECK_FOR_INTERRUPTS();
    1186              :             }
    1187              :             else
    1188              :             {
    1189              :                 /* discard top of heap, replacing it with the new tuple */
    1190       334665 :                 free_sort_tuple(state, &state->memtuples[0]);
    1191       334665 :                 tuplesort_heap_replace_top(state, tuple);
    1192              :             }
    1193      2138921 :             break;
    1194              : 
    1195       734073 :         case TSS_BUILDRUNS:
    1196              : 
    1197              :             /*
    1198              :              * Save the tuple into the unsorted array (there must be space)
    1199              :              */
    1200       734073 :             state->memtuples[state->memtupcount++] = *tuple;
    1201              : 
    1202              :             /*
    1203              :              * If we are over the memory limit, dump all tuples.
    1204              :              */
    1205       734073 :             dumptuples(state, false);
    1206       734073 :             break;
    1207              : 
    1208            0 :         default:
    1209            0 :             elog(ERROR, "invalid tuplesort state");
    1210              :             break;
    1211              :     }
    1212      2873084 :     MemoryContextSwitchTo(oldcontext);
    1213              : }
    1214              : 
    1215              : static bool
    1216      2899252 : consider_abort_common(Tuplesortstate *state)
    1217              : {
    1218              :     Assert(state->base.sortKeys[0].abbrev_converter != NULL);
    1219              :     Assert(state->base.sortKeys[0].abbrev_abort != NULL);
    1220              :     Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
    1221              : 
    1222              :     /*
    1223              :      * Check effectiveness of abbreviation optimization.  Consider aborting
    1224              :      * when still within memory limit.
    1225              :      */
    1226      2899252 :     if (state->status == TSS_INITIAL &&
    1227      2597219 :         state->memtupcount >= state->abbrevNext)
    1228              :     {
    1229         3280 :         state->abbrevNext *= 2;
    1230              : 
    1231              :         /*
    1232              :          * Check opclass-supplied abbreviation abort routine.  It may indicate
    1233              :          * that abbreviation should not proceed.
    1234              :          */
    1235         3280 :         if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
    1236              :                                                 state->base.sortKeys))
    1237         3216 :             return false;
    1238              : 
    1239              :         /*
    1240              :          * Finally, restore authoritative comparator, and indicate that
    1241              :          * abbreviation is not in play by setting abbrev_converter to NULL
    1242              :          */
    1243           64 :         state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
    1244           64 :         state->base.sortKeys[0].abbrev_converter = NULL;
    1245              :         /* Not strictly necessary, but be tidy */
    1246           64 :         state->base.sortKeys[0].abbrev_abort = NULL;
    1247           64 :         state->base.sortKeys[0].abbrev_full_comparator = NULL;
    1248              : 
    1249              :         /* Give up - expect original pass-by-value representation */
    1250           64 :         return true;
    1251              :     }
    1252              : 
    1253      2895972 :     return false;
    1254              : }
    1255              : 
    1256              : /*
    1257              :  * All tuples have been provided; finish the sort.
    1258              :  */
    1259              : void
    1260       149639 : tuplesort_performsort(Tuplesortstate *state)
    1261              : {
    1262       149639 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1263              : 
    1264       149639 :     if (trace_sort)
    1265            0 :         elog(LOG, "performsort of worker %d starting: %s",
    1266              :              state->worker, pg_rusage_show(&state->ru_start));
    1267              : 
    1268       149639 :     switch (state->status)
    1269              :     {
    1270       149294 :         case TSS_INITIAL:
    1271              : 
    1272              :             /*
    1273              :              * We were able to accumulate all the tuples within the allowed
    1274              :              * amount of memory, or leader to take over worker tapes
    1275              :              */
    1276       149294 :             if (SERIAL(state))
    1277              :             {
    1278              :                 /* Sort in memory and we're done */
    1279       148785 :                 tuplesort_sort_memtuples(state);
    1280       148725 :                 state->status = TSS_SORTEDINMEM;
    1281              :             }
    1282          509 :             else if (WORKER(state))
    1283              :             {
    1284              :                 /*
    1285              :                  * Parallel workers must still dump out tuples to tape.  No
    1286              :                  * merge is required to produce single output run, though.
    1287              :                  */
    1288          379 :                 inittapes(state, false);
    1289          379 :                 dumptuples(state, true);
    1290          379 :                 worker_nomergeruns(state);
    1291          379 :                 state->status = TSS_SORTEDONTAPE;
    1292              :             }
    1293              :             else
    1294              :             {
    1295              :                 /*
    1296              :                  * Leader will take over worker tapes and merge worker runs.
    1297              :                  * Note that mergeruns sets the correct state->status.
    1298              :                  */
    1299          130 :                 leader_takeover_tapes(state);
    1300          130 :                 mergeruns(state);
    1301              :             }
    1302       149234 :             state->current = 0;
    1303       149234 :             state->eof_reached = false;
    1304       149234 :             state->markpos_block = 0L;
    1305       149234 :             state->markpos_offset = 0;
    1306       149234 :             state->markpos_eof = false;
    1307       149234 :             break;
    1308              : 
    1309          255 :         case TSS_BOUNDED:
    1310              : 
    1311              :             /*
    1312              :              * We were able to accumulate all the tuples required for output
    1313              :              * in memory, using a heap to eliminate excess tuples.  Now we
    1314              :              * have to transform the heap to a properly-sorted array. Note
    1315              :              * that sort_bounded_heap sets the correct state->status.
    1316              :              */
    1317          255 :             sort_bounded_heap(state);
    1318          255 :             state->current = 0;
    1319          255 :             state->eof_reached = false;
    1320          255 :             state->markpos_offset = 0;
    1321          255 :             state->markpos_eof = false;
    1322          255 :             break;
    1323              : 
    1324           90 :         case TSS_BUILDRUNS:
    1325              : 
    1326              :             /*
    1327              :              * Finish tape-based sort.  First, flush all tuples remaining in
    1328              :              * memory out to tape; then merge until we have a single remaining
    1329              :              * run (or, if !randomAccess and !WORKER(), one run per tape).
    1330              :              * Note that mergeruns sets the correct state->status.
    1331              :              */
    1332           90 :             dumptuples(state, true);
    1333           90 :             mergeruns(state);
    1334           90 :             state->eof_reached = false;
    1335           90 :             state->markpos_block = 0L;
    1336           90 :             state->markpos_offset = 0;
    1337           90 :             state->markpos_eof = false;
    1338           90 :             break;
    1339              : 
    1340            0 :         default:
    1341            0 :             elog(ERROR, "invalid tuplesort state");
    1342              :             break;
    1343              :     }
    1344              : 
    1345       149579 :     if (trace_sort)
    1346              :     {
    1347            0 :         if (state->status == TSS_FINALMERGE)
    1348            0 :             elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
    1349              :                  state->worker, state->nInputTapes,
    1350              :                  pg_rusage_show(&state->ru_start));
    1351              :         else
    1352            0 :             elog(LOG, "performsort of worker %d done: %s",
    1353              :                  state->worker, pg_rusage_show(&state->ru_start));
    1354              :     }
    1355              : 
    1356       149579 :     MemoryContextSwitchTo(oldcontext);
    1357       149579 : }
    1358              : 
    1359              : /*
    1360              :  * Internal routine to fetch the next tuple in either forward or back
    1361              :  * direction into *stup.  Returns false if no more tuples.
    1362              :  * Returned tuple belongs to tuplesort memory context, and must not be freed
    1363              :  * by caller.  Note that fetched tuple is stored in memory that may be
    1364              :  * recycled by any future fetch.
    1365              :  */
    1366              : bool
    1367     17725561 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    1368              :                           SortTuple *stup)
    1369              : {
    1370              :     unsigned int tuplen;
    1371              :     size_t      nmoved;
    1372              : 
    1373              :     Assert(!WORKER(state));
    1374              : 
    1375     17725561 :     switch (state->status)
    1376              :     {
    1377     14784536 :         case TSS_SORTEDINMEM:
    1378              :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1379              :             Assert(!state->slabAllocatorUsed);
    1380     14784536 :             if (forward)
    1381              :             {
    1382     14784492 :                 if (state->current < state->memtupcount)
    1383              :                 {
    1384     14636762 :                     *stup = state->memtuples[state->current++];
    1385     14636762 :                     return true;
    1386              :                 }
    1387       147730 :                 state->eof_reached = true;
    1388              : 
    1389              :                 /*
    1390              :                  * Complain if caller tries to retrieve more tuples than
    1391              :                  * originally asked for in a bounded sort.  This is because
    1392              :                  * returning EOF here might be the wrong thing.
    1393              :                  */
    1394       147730 :                 if (state->bounded && state->current >= state->bound)
    1395            0 :                     elog(ERROR, "retrieved too many tuples in a bounded sort");
    1396              : 
    1397       147730 :                 return false;
    1398              :             }
    1399              :             else
    1400              :             {
    1401           44 :                 if (state->current <= 0)
    1402            0 :                     return false;
    1403              : 
    1404              :                 /*
    1405              :                  * if all tuples are fetched already then we return last
    1406              :                  * tuple, else - tuple before last returned.
    1407              :                  */
    1408           44 :                 if (state->eof_reached)
    1409            8 :                     state->eof_reached = false;
    1410              :                 else
    1411              :                 {
    1412           36 :                     state->current--;    /* last returned tuple */
    1413           36 :                     if (state->current <= 0)
    1414            4 :                         return false;
    1415              :                 }
    1416           40 :                 *stup = state->memtuples[state->current - 1];
    1417           40 :                 return true;
    1418              :             }
    1419              :             break;
    1420              : 
    1421       196999 :         case TSS_SORTEDONTAPE:
    1422              :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1423              :             Assert(state->slabAllocatorUsed);
    1424              : 
    1425              :             /*
    1426              :              * The slot that held the tuple that we returned in previous
    1427              :              * gettuple call can now be reused.
    1428              :              */
    1429       196999 :             if (state->lastReturnedTuple)
    1430              :             {
    1431       101900 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1432       101900 :                 state->lastReturnedTuple = NULL;
    1433              :             }
    1434              : 
    1435       196999 :             if (forward)
    1436              :             {
    1437       196979 :                 if (state->eof_reached)
    1438            0 :                     return false;
    1439              : 
    1440       196979 :                 if ((tuplen = getlen(state->result_tape, true)) != 0)
    1441              :                 {
    1442       196960 :                     READTUP(state, stup, state->result_tape, tuplen);
    1443              : 
    1444              :                     /*
    1445              :                      * Remember the tuple we return, so that we can recycle
    1446              :                      * its memory on next call.  (This can be NULL, in the
    1447              :                      * !state->tuples case).
    1448              :                      */
    1449       196960 :                     state->lastReturnedTuple = stup->tuple;
    1450              : 
    1451       196960 :                     return true;
    1452              :                 }
    1453              :                 else
    1454              :                 {
    1455           19 :                     state->eof_reached = true;
    1456           19 :                     return false;
    1457              :                 }
    1458              :             }
    1459              : 
    1460              :             /*
    1461              :              * Backward.
    1462              :              *
    1463              :              * if all tuples are fetched already then we return last tuple,
    1464              :              * else - tuple before last returned.
    1465              :              */
    1466           20 :             if (state->eof_reached)
    1467              :             {
    1468              :                 /*
    1469              :                  * Seek position is pointing just past the zero tuplen at the
    1470              :                  * end of file; back up to fetch last tuple's ending length
    1471              :                  * word.  If seek fails we must have a completely empty file.
    1472              :                  */
    1473            8 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1474              :                                               2 * sizeof(unsigned int));
    1475            8 :                 if (nmoved == 0)
    1476            0 :                     return false;
    1477            8 :                 else if (nmoved != 2 * sizeof(unsigned int))
    1478            0 :                     elog(ERROR, "unexpected tape position");
    1479            8 :                 state->eof_reached = false;
    1480              :             }
    1481              :             else
    1482              :             {
    1483              :                 /*
    1484              :                  * Back up and fetch previously-returned tuple's ending length
    1485              :                  * word.  If seek fails, assume we are at start of file.
    1486              :                  */
    1487           12 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1488              :                                               sizeof(unsigned int));
    1489           12 :                 if (nmoved == 0)
    1490            0 :                     return false;
    1491           12 :                 else if (nmoved != sizeof(unsigned int))
    1492            0 :                     elog(ERROR, "unexpected tape position");
    1493           12 :                 tuplen = getlen(state->result_tape, false);
    1494              : 
    1495              :                 /*
    1496              :                  * Back up to get ending length word of tuple before it.
    1497              :                  */
    1498           12 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1499              :                                               tuplen + 2 * sizeof(unsigned int));
    1500           12 :                 if (nmoved == tuplen + sizeof(unsigned int))
    1501              :                 {
    1502              :                     /*
    1503              :                      * We backed up over the previous tuple, but there was no
    1504              :                      * ending length word before it.  That means that the prev
    1505              :                      * tuple is the first tuple in the file.  It is now the
    1506              :                      * next to read in forward direction (not obviously right,
    1507              :                      * but that is what in-memory case does).
    1508              :                      */
    1509            4 :                     return false;
    1510              :                 }
    1511            8 :                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
    1512            0 :                     elog(ERROR, "bogus tuple length in backward scan");
    1513              :             }
    1514              : 
    1515           16 :             tuplen = getlen(state->result_tape, false);
    1516              : 
    1517              :             /*
    1518              :              * Now we have the length of the prior tuple, back up and read it.
    1519              :              * Note: READTUP expects we are positioned after the initial
    1520              :              * length word of the tuple, so back up to that point.
    1521              :              */
    1522           16 :             nmoved = LogicalTapeBackspace(state->result_tape,
    1523              :                                           tuplen);
    1524           16 :             if (nmoved != tuplen)
    1525            0 :                 elog(ERROR, "bogus tuple length in backward scan");
    1526           16 :             READTUP(state, stup, state->result_tape, tuplen);
    1527              : 
    1528              :             /*
    1529              :              * Remember the tuple we return, so that we can recycle its memory
    1530              :              * on next call. (This can be NULL, in the Datum case).
    1531              :              */
    1532           16 :             state->lastReturnedTuple = stup->tuple;
    1533              : 
    1534           16 :             return true;
    1535              : 
    1536      2744026 :         case TSS_FINALMERGE:
    1537              :             Assert(forward);
    1538              :             /* We are managing memory ourselves, with the slab allocator. */
    1539              :             Assert(state->slabAllocatorUsed);
    1540              : 
    1541              :             /*
    1542              :              * The slab slot holding the tuple that we returned in previous
    1543              :              * gettuple call can now be reused.
    1544              :              */
    1545      2744026 :             if (state->lastReturnedTuple)
    1546              :             {
    1547      2673800 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1548      2673800 :                 state->lastReturnedTuple = NULL;
    1549              :             }
    1550              : 
    1551              :             /*
    1552              :              * This code should match the inner loop of mergeonerun().
    1553              :              */
    1554      2744026 :             if (state->memtupcount > 0)
    1555              :             {
    1556      2743832 :                 int         srcTapeIndex = state->memtuples[0].srctape;
    1557      2743832 :                 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
    1558              :                 SortTuple   newtup;
    1559              : 
    1560      2743832 :                 *stup = state->memtuples[0];
    1561              : 
    1562              :                 /*
    1563              :                  * Remember the tuple we return, so that we can recycle its
    1564              :                  * memory on next call. (This can be NULL, in the Datum case).
    1565              :                  */
    1566      2743832 :                 state->lastReturnedTuple = stup->tuple;
    1567              : 
    1568              :                 /*
    1569              :                  * Pull next tuple from tape, and replace the returned tuple
    1570              :                  * at top of the heap with it.
    1571              :                  */
    1572      2743832 :                 if (!mergereadnext(state, srcTape, &newtup))
    1573              :                 {
    1574              :                     /*
    1575              :                      * If no more data, we've reached end of run on this tape.
    1576              :                      * Remove the top node from the heap.
    1577              :                      */
    1578          283 :                     tuplesort_heap_delete_top(state);
    1579          283 :                     state->nInputRuns--;
    1580              : 
    1581              :                     /*
    1582              :                      * Close the tape.  It'd go away at the end of the sort
    1583              :                      * anyway, but better to release the memory early.
    1584              :                      */
    1585          283 :                     LogicalTapeClose(srcTape);
    1586          283 :                     return true;
    1587              :                 }
    1588      2743549 :                 newtup.srctape = srcTapeIndex;
    1589      2743549 :                 tuplesort_heap_replace_top(state, &newtup);
    1590      2743549 :                 return true;
    1591              :             }
    1592          194 :             return false;
    1593              : 
    1594            0 :         default:
    1595            0 :             elog(ERROR, "invalid tuplesort state");
    1596              :             return false;       /* keep compiler quiet */
    1597              :     }
    1598              : }
    1599              : 
    1600              : 
    1601              : /*
    1602              :  * Advance over N tuples in either forward or back direction,
    1603              :  * without returning any data.  N==0 is a no-op.
    1604              :  * Returns true if successful, false if ran out of tuples.
    1605              :  */
    1606              : bool
    1607          258 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
    1608              : {
    1609              :     MemoryContext oldcontext;
    1610              : 
    1611              :     /*
    1612              :      * We don't actually support backwards skip yet, because no callers need
    1613              :      * it.  The API is designed to allow for that later, though.
    1614              :      */
    1615              :     Assert(forward);
    1616              :     Assert(ntuples >= 0);
    1617              :     Assert(!WORKER(state));
    1618              : 
    1619          258 :     switch (state->status)
    1620              :     {
    1621          242 :         case TSS_SORTEDINMEM:
    1622          242 :             if (state->memtupcount - state->current >= ntuples)
    1623              :             {
    1624          242 :                 state->current += ntuples;
    1625          242 :                 return true;
    1626              :             }
    1627            0 :             state->current = state->memtupcount;
    1628            0 :             state->eof_reached = true;
    1629              : 
    1630              :             /*
    1631              :              * Complain if caller tries to retrieve more tuples than
    1632              :              * originally asked for in a bounded sort.  This is because
    1633              :              * returning EOF here might be the wrong thing.
    1634              :              */
    1635            0 :             if (state->bounded && state->current >= state->bound)
    1636            0 :                 elog(ERROR, "retrieved too many tuples in a bounded sort");
    1637              : 
    1638            0 :             return false;
    1639              : 
    1640           16 :         case TSS_SORTEDONTAPE:
    1641              :         case TSS_FINALMERGE:
    1642              : 
    1643              :             /*
    1644              :              * We could probably optimize these cases better, but for now it's
    1645              :              * not worth the trouble.
    1646              :              */
    1647           16 :             oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1648       160088 :             while (ntuples-- > 0)
    1649              :             {
    1650              :                 SortTuple   stup;
    1651              : 
    1652       160072 :                 if (!tuplesort_gettuple_common(state, forward, &stup))
    1653              :                 {
    1654            0 :                     MemoryContextSwitchTo(oldcontext);
    1655            0 :                     return false;
    1656              :                 }
    1657       160072 :                 CHECK_FOR_INTERRUPTS();
    1658              :             }
    1659           16 :             MemoryContextSwitchTo(oldcontext);
    1660           16 :             return true;
    1661              : 
    1662            0 :         default:
    1663            0 :             elog(ERROR, "invalid tuplesort state");
    1664              :             return false;       /* keep compiler quiet */
    1665              :     }
    1666              : }
    1667              : 
    1668              : /*
    1669              :  * tuplesort_merge_order - report merge order we'll use for given memory
    1670              :  * (note: "merge order" just means the number of input tapes in the merge).
    1671              :  *
    1672              :  * This is exported for use by the planner.  allowedMem is in bytes.
    1673              :  */
    1674              : int
    1675        12478 : tuplesort_merge_order(int64 allowedMem)
    1676              : {
    1677              :     int         mOrder;
    1678              : 
    1679              :     /*----------
    1680              :      * In the merge phase, we need buffer space for each input and output tape.
    1681              :      * Each pass in the balanced merge algorithm reads from M input tapes, and
    1682              :      * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
    1683              :      * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
    1684              :      * input tape.
    1685              :      *
    1686              :      * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
    1687              :      *            N * TAPE_BUFFER_OVERHEAD
    1688              :      *
    1689              :      * Except for the last and next-to-last merge passes, where there can be
    1690              :      * fewer tapes left to process, M = N.  We choose M so that we have the
    1691              :      * desired amount of memory available for the input buffers
    1692              :      * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
    1693              :      * available for the tape buffers (allowedMem).
    1694              :      *
    1695              :      * Note: you might be thinking we need to account for the memtuples[]
    1696              :      * array in this calculation, but we effectively treat that as part of the
    1697              :      * MERGE_BUFFER_SIZE workspace.
    1698              :      *----------
    1699              :      */
    1700        12478 :     mOrder = allowedMem /
    1701              :         (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
    1702              : 
    1703              :     /*
    1704              :      * Even in minimum memory, use at least a MINORDER merge.  On the other
    1705              :      * hand, even when we have lots of memory, do not use more than a MAXORDER
    1706              :      * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
    1707              :      * additional tape reduces the amount of memory available to build runs,
    1708              :      * which in turn can cause the same sort to need more runs, which makes
    1709              :      * merging slower even if it can still be done in a single pass.  Also,
    1710              :      * high order merges are quite slow due to CPU cache effects; it can be
    1711              :      * faster to pay the I/O cost of a multi-pass merge than to perform a
    1712              :      * single merge pass across many hundreds of tapes.
    1713              :      */
    1714        12478 :     mOrder = Max(mOrder, MINORDER);
    1715        12478 :     mOrder = Min(mOrder, MAXORDER);
    1716              : 
    1717        12478 :     return mOrder;
    1718              : }
    1719              : 
    1720              : /*
    1721              :  * Helper function to calculate how much memory to allocate for the read buffer
    1722              :  * of each input tape in a merge pass.
    1723              :  *
    1724              :  * 'avail_mem' is the amount of memory available for the buffers of all the
    1725              :  *      tapes, both input and output.
    1726              :  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
    1727              :  * 'maxOutputTapes' is the max. number of output tapes we should produce.
    1728              :  */
    1729              : static int64
    1730          240 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
    1731              :                        int maxOutputTapes)
    1732              : {
    1733              :     int         nOutputRuns;
    1734              :     int         nOutputTapes;
    1735              : 
    1736              :     /*
    1737              :      * How many output tapes will we produce in this pass?
    1738              :      *
    1739              :      * This is nInputRuns / nInputTapes, rounded up.
    1740              :      */
    1741          240 :     nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
    1742              : 
    1743          240 :     nOutputTapes = Min(nOutputRuns, maxOutputTapes);
    1744              : 
    1745              :     /*
    1746              :      * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
    1747              :      * remaining memory is divided evenly between the input tapes.
    1748              :      *
    1749              :      * This also follows from the formula in tuplesort_merge_order, but here
    1750              :      * we derive the input buffer size from the amount of memory available,
    1751              :      * and M and N.
    1752              :      */
    1753          240 :     return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
    1754              : }
    1755              : 
    1756              : /*
    1757              :  * inittapes - initialize for tape sorting.
    1758              :  *
    1759              :  * This is called only if we have found we won't sort in memory.
    1760              :  */
    1761              : static void
    1762          469 : inittapes(Tuplesortstate *state, bool mergeruns)
    1763              : {
    1764              :     Assert(!LEADER(state));
    1765              : 
    1766          469 :     if (mergeruns)
    1767              :     {
    1768              :         /* Compute number of input tapes to use when merging */
    1769           90 :         state->maxTapes = tuplesort_merge_order(state->allowedMem);
    1770              :     }
    1771              :     else
    1772              :     {
    1773              :         /* Workers can sometimes produce single run, output without merge */
    1774              :         Assert(WORKER(state));
    1775          379 :         state->maxTapes = MINORDER;
    1776              :     }
    1777              : 
    1778          469 :     if (trace_sort)
    1779            0 :         elog(LOG, "worker %d switching to external sort with %d tapes: %s",
    1780              :              state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
    1781              : 
    1782              :     /* Create the tape set */
    1783          469 :     inittapestate(state, state->maxTapes);
    1784          469 :     state->tapeset =
    1785          469 :         LogicalTapeSetCreate(false,
    1786          469 :                              state->shared ? &state->shared->fileset : NULL,
    1787              :                              state->worker);
    1788              : 
    1789          469 :     state->currentRun = 0;
    1790              : 
    1791              :     /*
    1792              :      * Initialize logical tape arrays.
    1793              :      */
    1794          469 :     state->inputTapes = NULL;
    1795          469 :     state->nInputTapes = 0;
    1796          469 :     state->nInputRuns = 0;
    1797              : 
    1798          469 :     state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
    1799          469 :     state->nOutputTapes = 0;
    1800          469 :     state->nOutputRuns = 0;
    1801              : 
    1802          469 :     state->status = TSS_BUILDRUNS;
    1803              : 
    1804          469 :     selectnewtape(state);
    1805          469 : }
    1806              : 
    1807              : /*
    1808              :  * inittapestate - initialize generic tape management state
    1809              :  */
    1810              : static void
    1811          599 : inittapestate(Tuplesortstate *state, int maxTapes)
    1812              : {
    1813              :     int64       tapeSpace;
    1814              : 
    1815              :     /*
    1816              :      * Decrease availMem to reflect the space needed for tape buffers; but
    1817              :      * don't decrease it to the point that we have no room for tuples. (That
    1818              :      * case is only likely to occur if sorting pass-by-value Datums; in all
    1819              :      * other scenarios the memtuples[] array is unlikely to occupy more than
    1820              :      * half of allowedMem.  In the pass-by-value case it's not important to
    1821              :      * account for tuple space, so we don't care if LACKMEM becomes
    1822              :      * inaccurate.)
    1823              :      */
    1824          599 :     tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
    1825              : 
    1826          599 :     if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
    1827          521 :         USEMEM(state, tapeSpace);
    1828              : 
    1829              :     /*
    1830              :      * Make sure that the temp file(s) underlying the tape set are created in
    1831              :      * suitable temp tablespaces.  For parallel sorts, this should have been
    1832              :      * called already, but it doesn't matter if it is called a second time.
    1833              :      */
    1834          599 :     PrepareTempTablespaces();
    1835          599 : }
    1836              : 
    1837              : /*
    1838              :  * selectnewtape -- select next tape to output to.
    1839              :  *
    1840              :  * This is called after finishing a run when we know another run
    1841              :  * must be started.  This is used both when building the initial
    1842              :  * runs, and during merge passes.
    1843              :  */
    1844              : static void
    1845         1200 : selectnewtape(Tuplesortstate *state)
    1846              : {
    1847              :     /*
    1848              :      * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
    1849              :      * both zero.  On each call, we create a new output tape to hold the next
    1850              :      * run, until maxTapes is reached.  After that, we assign new runs to the
    1851              :      * existing tapes in a round robin fashion.
    1852              :      */
    1853         1200 :     if (state->nOutputTapes < state->maxTapes)
    1854              :     {
    1855              :         /* Create a new tape to hold the next run */
    1856              :         Assert(state->outputTapes[state->nOutputRuns] == NULL);
    1857              :         Assert(state->nOutputRuns == state->nOutputTapes);
    1858          812 :         state->destTape = LogicalTapeCreate(state->tapeset);
    1859          812 :         state->outputTapes[state->nOutputTapes] = state->destTape;
    1860          812 :         state->nOutputTapes++;
    1861          812 :         state->nOutputRuns++;
    1862              :     }
    1863              :     else
    1864              :     {
    1865              :         /*
    1866              :          * We have reached the max number of tapes.  Append to an existing
    1867              :          * tape.
    1868              :          */
    1869          388 :         state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
    1870          388 :         state->nOutputRuns++;
    1871              :     }
    1872         1200 : }
    1873              : 
    1874              : /*
    1875              :  * Initialize the slab allocation arena, for the given number of slots.
    1876              :  */
    1877              : static void
    1878          220 : init_slab_allocator(Tuplesortstate *state, int numSlots)
    1879              : {
    1880          220 :     if (numSlots > 0)
    1881              :     {
    1882              :         char       *p;
    1883              :         int         i;
    1884              : 
    1885          204 :         state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
    1886          204 :         state->slabMemoryEnd = state->slabMemoryBegin +
    1887          204 :             numSlots * SLAB_SLOT_SIZE;
    1888          204 :         state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
    1889          204 :         USEMEM(state, numSlots * SLAB_SLOT_SIZE);
    1890              : 
    1891          204 :         p = state->slabMemoryBegin;
    1892          778 :         for (i = 0; i < numSlots - 1; i++)
    1893              :         {
    1894          574 :             ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
    1895          574 :             p += SLAB_SLOT_SIZE;
    1896              :         }
    1897          204 :         ((SlabSlot *) p)->nextfree = NULL;
    1898              :     }
    1899              :     else
    1900              :     {
    1901           16 :         state->slabMemoryBegin = state->slabMemoryEnd = NULL;
    1902           16 :         state->slabFreeHead = NULL;
    1903              :     }
    1904          220 :     state->slabAllocatorUsed = true;
    1905          220 : }
    1906              : 
    1907              : /*
    1908              :  * mergeruns -- merge all the completed initial runs.
    1909              :  *
    1910              :  * This implements the Balanced k-Way Merge Algorithm.  All input data has
    1911              :  * already been written to initial runs on tape (see dumptuples).
    1912              :  */
    1913              : static void
    1914          220 : mergeruns(Tuplesortstate *state)
    1915              : {
    1916              :     int         tapenum;
    1917              : 
    1918              :     Assert(state->status == TSS_BUILDRUNS);
    1919              :     Assert(state->memtupcount == 0);
    1920              : 
    1921          220 :     if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    1922              :     {
    1923              :         /*
    1924              :          * If there are multiple runs to be merged, when we go to read back
    1925              :          * tuples from disk, abbreviated keys will not have been stored, and
    1926              :          * we don't care to regenerate them.  Disable abbreviation from this
    1927              :          * point on.
    1928              :          */
    1929           19 :         state->base.sortKeys->abbrev_converter = NULL;
    1930           19 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
    1931              : 
    1932              :         /* Not strictly necessary, but be tidy */
    1933           19 :         state->base.sortKeys->abbrev_abort = NULL;
    1934           19 :         state->base.sortKeys->abbrev_full_comparator = NULL;
    1935              :     }
    1936              : 
    1937              :     /*
    1938              :      * Reset tuple memory.  We've freed all the tuples that we previously
    1939              :      * allocated.  We will use the slab allocator from now on.
    1940              :      */
    1941          220 :     MemoryContextResetOnly(state->base.tuplecontext);
    1942              : 
    1943              :     /*
    1944              :      * We no longer need a large memtuples array.  (We will allocate a smaller
    1945              :      * one for the heap later.)
    1946              :      */
    1947          220 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1948          220 :     pfree(state->memtuples);
    1949          220 :     state->memtuples = NULL;
    1950              : 
    1951              :     /*
    1952              :      * Initialize the slab allocator.  We need one slab slot per input tape,
    1953              :      * for the tuples in the heap, plus one to hold the tuple last returned
    1954              :      * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
    1955              :      * however, we don't need to do allocate anything.)
    1956              :      *
    1957              :      * In a multi-pass merge, we could shrink this allocation for the last
    1958              :      * merge pass, if it has fewer tapes than previous passes, but we don't
    1959              :      * bother.
    1960              :      *
    1961              :      * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
    1962              :      * to track memory usage of individual tuples.
    1963              :      */
    1964          220 :     if (state->base.tuples)
    1965          204 :         init_slab_allocator(state, state->nOutputTapes + 1);
    1966              :     else
    1967           16 :         init_slab_allocator(state, 0);
    1968              : 
    1969              :     /*
    1970              :      * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
    1971              :      * from each input tape.
    1972              :      *
    1973              :      * We could shrink this, too, between passes in a multi-pass merge, but we
    1974              :      * don't bother.  (The initial input tapes are still in outputTapes.  The
    1975              :      * number of input tapes will not increase between passes.)
    1976              :      */
    1977          220 :     state->memtupsize = state->nOutputTapes;
    1978          440 :     state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
    1979          220 :                                                         state->nOutputTapes * sizeof(SortTuple));
    1980          220 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1981              : 
    1982              :     /*
    1983              :      * Use all the remaining memory we have available for tape buffers among
    1984              :      * all the input tapes.  At the beginning of each merge pass, we will
    1985              :      * divide this memory between the input and output tapes in the pass.
    1986              :      */
    1987          220 :     state->tape_buffer_mem = state->availMem;
    1988          220 :     USEMEM(state, state->tape_buffer_mem);
    1989          220 :     if (trace_sort)
    1990            0 :         elog(LOG, "worker %d using %zu KB of memory for tape buffers",
    1991              :              state->worker, state->tape_buffer_mem / 1024);
    1992              : 
    1993              :     for (;;)
    1994              :     {
    1995              :         /*
    1996              :          * On the first iteration, or if we have read all the runs from the
    1997              :          * input tapes in a multi-pass merge, it's time to start a new pass.
    1998              :          * Rewind all the output tapes, and make them inputs for the next
    1999              :          * pass.
    2000              :          */
    2001          312 :         if (state->nInputRuns == 0)
    2002              :         {
    2003              :             int64       input_buffer_size;
    2004              : 
    2005              :             /* Close the old, emptied, input tapes */
    2006          240 :             if (state->nInputTapes > 0)
    2007              :             {
    2008          140 :                 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2009          120 :                     LogicalTapeClose(state->inputTapes[tapenum]);
    2010           20 :                 pfree(state->inputTapes);
    2011              :             }
    2012              : 
    2013              :             /* Previous pass's outputs become next pass's inputs. */
    2014          240 :             state->inputTapes = state->outputTapes;
    2015          240 :             state->nInputTapes = state->nOutputTapes;
    2016          240 :             state->nInputRuns = state->nOutputRuns;
    2017              : 
    2018              :             /*
    2019              :              * Reset output tape variables.  The actual LogicalTapes will be
    2020              :              * created as needed, here we only allocate the array to hold
    2021              :              * them.
    2022              :              */
    2023          240 :             state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
    2024          240 :             state->nOutputTapes = 0;
    2025          240 :             state->nOutputRuns = 0;
    2026              : 
    2027              :             /*
    2028              :              * Redistribute the memory allocated for tape buffers, among the
    2029              :              * new input and output tapes.
    2030              :              */
    2031          240 :             input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
    2032              :                                                        state->nInputTapes,
    2033              :                                                        state->nInputRuns,
    2034              :                                                        state->maxTapes);
    2035              : 
    2036          240 :             if (trace_sort)
    2037            0 :                 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
    2038              :                      state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
    2039              :                      pg_rusage_show(&state->ru_start));
    2040              : 
    2041              :             /* Prepare the new input tapes for merge pass. */
    2042          942 :             for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2043          702 :                 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
    2044              : 
    2045              :             /*
    2046              :              * If there's just one run left on each input tape, then only one
    2047              :              * merge pass remains.  If we don't have to produce a materialized
    2048              :              * sorted tape, we can stop at this point and do the final merge
    2049              :              * on-the-fly.
    2050              :              */
    2051          240 :             if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
    2052          226 :                 && state->nInputRuns <= state->nInputTapes
    2053          206 :                 && !WORKER(state))
    2054              :             {
    2055              :                 /* Tell logtape.c we won't be writing anymore */
    2056          206 :                 LogicalTapeSetForgetFreeSpace(state->tapeset);
    2057              :                 /* Initialize for the final merge pass */
    2058          206 :                 beginmerge(state);
    2059          206 :                 state->status = TSS_FINALMERGE;
    2060          206 :                 return;
    2061              :             }
    2062              :         }
    2063              : 
    2064              :         /* Select an output tape */
    2065          106 :         selectnewtape(state);
    2066              : 
    2067              :         /* Merge one run from each input tape. */
    2068          106 :         mergeonerun(state);
    2069              : 
    2070              :         /*
    2071              :          * If the input tapes are empty, and we output only one output run,
    2072              :          * we're done.  The current output tape contains the final result.
    2073              :          */
    2074          106 :         if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
    2075           14 :             break;
    2076              :     }
    2077              : 
    2078              :     /*
    2079              :      * Done.  The result is on a single run on a single tape.
    2080              :      */
    2081           14 :     state->result_tape = state->outputTapes[0];
    2082           14 :     if (!WORKER(state))
    2083           14 :         LogicalTapeFreeze(state->result_tape, NULL);
    2084              :     else
    2085            0 :         worker_freeze_result_tape(state);
    2086           14 :     state->status = TSS_SORTEDONTAPE;
    2087              : 
    2088              :     /* Close all the now-empty input tapes, to release their read buffers. */
    2089           74 :     for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2090           60 :         LogicalTapeClose(state->inputTapes[tapenum]);
    2091              : }
    2092              : 
    2093              : /*
    2094              :  * Merge one run from each input tape.
    2095              :  */
    2096              : static void
    2097          106 : mergeonerun(Tuplesortstate *state)
    2098              : {
    2099              :     int         srcTapeIndex;
    2100              :     LogicalTape *srcTape;
    2101              : 
    2102              :     /*
    2103              :      * Start the merge by loading one tuple from each active source tape into
    2104              :      * the heap.
    2105              :      */
    2106          106 :     beginmerge(state);
    2107              : 
    2108              :     Assert(state->slabAllocatorUsed);
    2109              : 
    2110              :     /*
    2111              :      * Execute merge by repeatedly extracting lowest tuple in heap, writing it
    2112              :      * out, and replacing it with next tuple from same tape (if there is
    2113              :      * another one).
    2114              :      */
    2115       580394 :     while (state->memtupcount > 0)
    2116              :     {
    2117              :         SortTuple   stup;
    2118              : 
    2119              :         /* write the tuple to destTape */
    2120       580288 :         srcTapeIndex = state->memtuples[0].srctape;
    2121       580288 :         srcTape = state->inputTapes[srcTapeIndex];
    2122       580288 :         WRITETUP(state, state->destTape, &state->memtuples[0]);
    2123              : 
    2124              :         /* recycle the slot of the tuple we just wrote out, for the next read */
    2125       580288 :         if (state->memtuples[0].tuple)
    2126       490232 :             RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
    2127              : 
    2128              :         /*
    2129              :          * pull next tuple from the tape, and replace the written-out tuple in
    2130              :          * the heap with it.
    2131              :          */
    2132       580288 :         if (mergereadnext(state, srcTape, &stup))
    2133              :         {
    2134       579720 :             stup.srctape = srcTapeIndex;
    2135       579720 :             tuplesort_heap_replace_top(state, &stup);
    2136              :         }
    2137              :         else
    2138              :         {
    2139          568 :             tuplesort_heap_delete_top(state);
    2140          568 :             state->nInputRuns--;
    2141              :         }
    2142              :     }
    2143              : 
    2144              :     /*
    2145              :      * When the heap empties, we're done.  Write an end-of-run marker on the
    2146              :      * output tape.
    2147              :      */
    2148          106 :     markrunend(state->destTape);
    2149          106 : }
    2150              : 
    2151              : /*
    2152              :  * beginmerge - initialize for a merge pass
    2153              :  *
    2154              :  * Fill the merge heap with the first tuple from each input tape.
    2155              :  */
    2156              : static void
    2157          312 : beginmerge(Tuplesortstate *state)
    2158              : {
    2159              :     int         activeTapes;
    2160              :     int         srcTapeIndex;
    2161              : 
    2162              :     /* Heap should be empty here */
    2163              :     Assert(state->memtupcount == 0);
    2164              : 
    2165          312 :     activeTapes = Min(state->nInputTapes, state->nInputRuns);
    2166              : 
    2167         1402 :     for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    2168              :     {
    2169              :         SortTuple   tup;
    2170              : 
    2171         1090 :         if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
    2172              :         {
    2173          883 :             tup.srctape = srcTapeIndex;
    2174          883 :             tuplesort_heap_insert(state, &tup);
    2175              :         }
    2176              :     }
    2177          312 : }
    2178              : 
    2179              : /*
    2180              :  * mergereadnext - read next tuple from one merge input tape
    2181              :  *
    2182              :  * Returns false on EOF.
    2183              :  */
    2184              : static bool
    2185      3325210 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
    2186              : {
    2187              :     unsigned int tuplen;
    2188              : 
    2189              :     /* read next tuple, if any */
    2190      3325210 :     if ((tuplen = getlen(srcTape, true)) == 0)
    2191         1058 :         return false;
    2192      3324152 :     READTUP(state, stup, srcTape, tuplen);
    2193              : 
    2194      3324152 :     return true;
    2195              : }
    2196              : 
    2197              : /*
    2198              :  * dumptuples - remove tuples from memtuples and write initial run to tape
    2199              :  *
    2200              :  * When alltuples = true, dump everything currently in memory.  (This case is
    2201              :  * only used at end of input data.)
    2202              :  */
    2203              : static void
    2204       734632 : dumptuples(Tuplesortstate *state, bool alltuples)
    2205              : {
    2206              :     int         memtupwrite;
    2207              :     int         i;
    2208              : 
    2209              :     /*
    2210              :      * Nothing to do if we still fit in available memory and have array slots,
    2211              :      * unless this is the final call during initial run generation.
    2212              :      */
    2213       734632 :     if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
    2214       734007 :         !alltuples)
    2215       733538 :         return;
    2216              : 
    2217              :     /*
    2218              :      * Final call might require no sorting, in rare cases where we just so
    2219              :      * happen to have previously LACKMEM()'d at the point where exactly all
    2220              :      * remaining tuples are loaded into memory, just before input was
    2221              :      * exhausted.  In general, short final runs are quite possible, but avoid
    2222              :      * creating a completely empty run.  In a worker, though, we must produce
    2223              :      * at least one tape, even if it's empty.
    2224              :      */
    2225         1094 :     if (state->memtupcount == 0 && state->currentRun > 0)
    2226            0 :         return;
    2227              : 
    2228              :     Assert(state->status == TSS_BUILDRUNS);
    2229              : 
    2230              :     /*
    2231              :      * It seems unlikely that this limit will ever be exceeded, but take no
    2232              :      * chances
    2233              :      */
    2234         1094 :     if (state->currentRun == INT_MAX)
    2235            0 :         ereport(ERROR,
    2236              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2237              :                  errmsg("cannot have more than %d runs for an external sort",
    2238              :                         INT_MAX)));
    2239              : 
    2240         1094 :     if (state->currentRun > 0)
    2241          625 :         selectnewtape(state);
    2242              : 
    2243         1094 :     state->currentRun++;
    2244              : 
    2245         1094 :     if (trace_sort)
    2246            0 :         elog(LOG, "worker %d starting quicksort of run %d: %s",
    2247              :              state->worker, state->currentRun,
    2248              :              pg_rusage_show(&state->ru_start));
    2249              : 
    2250              :     /*
    2251              :      * Sort all tuples accumulated within the allowed amount of memory for
    2252              :      * this run.
    2253              :      */
    2254         1094 :     tuplesort_sort_memtuples(state);
    2255              : 
    2256         1094 :     if (trace_sort)
    2257            0 :         elog(LOG, "worker %d finished quicksort of run %d: %s",
    2258              :              state->worker, state->currentRun,
    2259              :              pg_rusage_show(&state->ru_start));
    2260              : 
    2261         1094 :     memtupwrite = state->memtupcount;
    2262      3085094 :     for (i = 0; i < memtupwrite; i++)
    2263              :     {
    2264      3084000 :         SortTuple  *stup = &state->memtuples[i];
    2265              : 
    2266      3084000 :         WRITETUP(state, state->destTape, stup);
    2267              :     }
    2268              : 
    2269         1094 :     state->memtupcount = 0;
    2270              : 
    2271              :     /*
    2272              :      * Reset tuple memory.  We've freed all of the tuples that we previously
    2273              :      * allocated.  It's important to avoid fragmentation when there is a stark
    2274              :      * change in the sizes of incoming tuples.  In bounded sorts,
    2275              :      * fragmentation due to AllocSetFree's bucketing by size class might be
    2276              :      * particularly bad if this step wasn't taken.
    2277              :      */
    2278         1094 :     MemoryContextReset(state->base.tuplecontext);
    2279              : 
    2280              :     /*
    2281              :      * Now update the memory accounting to subtract the memory used by the
    2282              :      * tuple.
    2283              :      */
    2284         1094 :     FREEMEM(state, state->tupleMem);
    2285         1094 :     state->tupleMem = 0;
    2286              : 
    2287         1094 :     markrunend(state->destTape);
    2288              : 
    2289         1094 :     if (trace_sort)
    2290            0 :         elog(LOG, "worker %d finished writing run %d to tape %d: %s",
    2291              :              state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
    2292              :              pg_rusage_show(&state->ru_start));
    2293              : }
    2294              : 
    2295              : /*
    2296              :  * tuplesort_rescan     - rewind and replay the scan
    2297              :  */
    2298              : void
    2299           29 : tuplesort_rescan(Tuplesortstate *state)
    2300              : {
    2301           29 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2302              : 
    2303              :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2304              : 
    2305           29 :     switch (state->status)
    2306              :     {
    2307           24 :         case TSS_SORTEDINMEM:
    2308           24 :             state->current = 0;
    2309           24 :             state->eof_reached = false;
    2310           24 :             state->markpos_offset = 0;
    2311           24 :             state->markpos_eof = false;
    2312           24 :             break;
    2313            5 :         case TSS_SORTEDONTAPE:
    2314            5 :             LogicalTapeRewindForRead(state->result_tape, 0);
    2315            5 :             state->eof_reached = false;
    2316            5 :             state->markpos_block = 0L;
    2317            5 :             state->markpos_offset = 0;
    2318            5 :             state->markpos_eof = false;
    2319            5 :             break;
    2320            0 :         default:
    2321            0 :             elog(ERROR, "invalid tuplesort state");
    2322              :             break;
    2323              :     }
    2324              : 
    2325           29 :     MemoryContextSwitchTo(oldcontext);
    2326           29 : }
    2327              : 
    2328              : /*
    2329              :  * tuplesort_markpos    - saves current position in the merged sort file
    2330              :  */
    2331              : void
    2332       386309 : tuplesort_markpos(Tuplesortstate *state)
    2333              : {
    2334       386309 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2335              : 
    2336              :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2337              : 
    2338       386309 :     switch (state->status)
    2339              :     {
    2340       380437 :         case TSS_SORTEDINMEM:
    2341       380437 :             state->markpos_offset = state->current;
    2342       380437 :             state->markpos_eof = state->eof_reached;
    2343       380437 :             break;
    2344         5872 :         case TSS_SORTEDONTAPE:
    2345         5872 :             LogicalTapeTell(state->result_tape,
    2346              :                             &state->markpos_block,
    2347              :                             &state->markpos_offset);
    2348         5872 :             state->markpos_eof = state->eof_reached;
    2349         5872 :             break;
    2350            0 :         default:
    2351            0 :             elog(ERROR, "invalid tuplesort state");
    2352              :             break;
    2353              :     }
    2354              : 
    2355       386309 :     MemoryContextSwitchTo(oldcontext);
    2356       386309 : }
    2357              : 
    2358              : /*
    2359              :  * tuplesort_restorepos - restores current position in merged sort file to
    2360              :  *                        last saved position
    2361              :  */
    2362              : void
    2363        24037 : tuplesort_restorepos(Tuplesortstate *state)
    2364              : {
    2365        24037 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2366              : 
    2367              :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2368              : 
    2369        24037 :     switch (state->status)
    2370              :     {
    2371        19909 :         case TSS_SORTEDINMEM:
    2372        19909 :             state->current = state->markpos_offset;
    2373        19909 :             state->eof_reached = state->markpos_eof;
    2374        19909 :             break;
    2375         4128 :         case TSS_SORTEDONTAPE:
    2376         4128 :             LogicalTapeSeek(state->result_tape,
    2377              :                             state->markpos_block,
    2378              :                             state->markpos_offset);
    2379         4128 :             state->eof_reached = state->markpos_eof;
    2380         4128 :             break;
    2381            0 :         default:
    2382            0 :             elog(ERROR, "invalid tuplesort state");
    2383              :             break;
    2384              :     }
    2385              : 
    2386        24037 :     MemoryContextSwitchTo(oldcontext);
    2387        24037 : }
    2388              : 
    2389              : /*
    2390              :  * tuplesort_get_stats - extract summary statistics
    2391              :  *
    2392              :  * This can be called after tuplesort_performsort() finishes to obtain
    2393              :  * printable summary information about how the sort was performed.
    2394              :  */
    2395              : void
    2396          264 : tuplesort_get_stats(Tuplesortstate *state,
    2397              :                     TuplesortInstrumentation *stats)
    2398              : {
    2399              :     /*
    2400              :      * Note: it might seem we should provide both memory and disk usage for a
    2401              :      * disk-based sort.  However, the current code doesn't track memory space
    2402              :      * accurately once we have begun to return tuples to the caller (since we
    2403              :      * don't account for pfree's the caller is expected to do), so we cannot
    2404              :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    2405              :      * to fix.  Is it worth creating an API for the memory context code to
    2406              :      * tell us how much is actually used in sortcontext?
    2407              :      */
    2408          264 :     tuplesort_updatemax(state);
    2409              : 
    2410          264 :     if (state->isMaxSpaceDisk)
    2411            4 :         stats->spaceType = SORT_SPACE_TYPE_DISK;
    2412              :     else
    2413          260 :         stats->spaceType = SORT_SPACE_TYPE_MEMORY;
    2414          264 :     stats->spaceUsed = (state->maxSpace + 1023) / 1024;
    2415              : 
    2416          264 :     switch (state->maxSpaceStatus)
    2417              :     {
    2418          260 :         case TSS_SORTEDINMEM:
    2419          260 :             if (state->boundUsed)
    2420           28 :                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
    2421              :             else
    2422          232 :                 stats->sortMethod = SORT_TYPE_QUICKSORT;
    2423          260 :             break;
    2424            0 :         case TSS_SORTEDONTAPE:
    2425            0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
    2426            0 :             break;
    2427            4 :         case TSS_FINALMERGE:
    2428            4 :             stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
    2429            4 :             break;
    2430            0 :         default:
    2431            0 :             stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
    2432            0 :             break;
    2433              :     }
    2434          264 : }
    2435              : 
    2436              : /*
    2437              :  * Convert TuplesortMethod to a string.
    2438              :  */
    2439              : const char *
    2440          196 : tuplesort_method_name(TuplesortMethod m)
    2441              : {
    2442          196 :     switch (m)
    2443              :     {
    2444            0 :         case SORT_TYPE_STILL_IN_PROGRESS:
    2445            0 :             return "still in progress";
    2446           28 :         case SORT_TYPE_TOP_N_HEAPSORT:
    2447           28 :             return "top-N heapsort";
    2448          164 :         case SORT_TYPE_QUICKSORT:
    2449          164 :             return "quicksort";
    2450            0 :         case SORT_TYPE_EXTERNAL_SORT:
    2451            0 :             return "external sort";
    2452            4 :         case SORT_TYPE_EXTERNAL_MERGE:
    2453            4 :             return "external merge";
    2454              :     }
    2455              : 
    2456            0 :     return "unknown";
    2457              : }
    2458              : 
    2459              : /*
    2460              :  * Convert TuplesortSpaceType to a string.
    2461              :  */
    2462              : const char *
    2463          172 : tuplesort_space_type_name(TuplesortSpaceType t)
    2464              : {
    2465              :     Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
    2466          172 :     return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
    2467              : }
    2468              : 
    2469              : 
    2470              : /*
    2471              :  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
    2472              :  */
    2473              : 
    2474              : /*
    2475              :  * Convert the existing unordered array of SortTuples to a bounded heap,
    2476              :  * discarding all but the smallest "state->bound" tuples.
    2477              :  *
    2478              :  * When working with a bounded heap, we want to keep the largest entry
    2479              :  * at the root (array entry zero), instead of the smallest as in the normal
    2480              :  * sort case.  This allows us to discard the largest entry cheaply.
    2481              :  * Therefore, we temporarily reverse the sort direction.
    2482              :  */
    2483              : static void
    2484          255 : make_bounded_heap(Tuplesortstate *state)
    2485              : {
    2486          255 :     int         tupcount = state->memtupcount;
    2487              :     int         i;
    2488              : 
    2489              :     Assert(state->status == TSS_INITIAL);
    2490              :     Assert(state->bounded);
    2491              :     Assert(tupcount >= state->bound);
    2492              :     Assert(SERIAL(state));
    2493              : 
    2494              :     /* Reverse sort direction so largest entry will be at root */
    2495          255 :     reversedirection(state);
    2496              : 
    2497          255 :     state->memtupcount = 0;      /* make the heap empty */
    2498        30240 :     for (i = 0; i < tupcount; i++)
    2499              :     {
    2500        29985 :         if (state->memtupcount < state->bound)
    2501              :         {
    2502              :             /* Insert next tuple into heap */
    2503              :             /* Must copy source tuple to avoid possible overwrite */
    2504        14865 :             SortTuple   stup = state->memtuples[i];
    2505              : 
    2506        14865 :             tuplesort_heap_insert(state, &stup);
    2507              :         }
    2508              :         else
    2509              :         {
    2510              :             /*
    2511              :              * The heap is full.  Replace the largest entry with the new
    2512              :              * tuple, or just discard it, if it's larger than anything already
    2513              :              * in the heap.
    2514              :              */
    2515        15120 :             if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
    2516              :             {
    2517         6969 :                 free_sort_tuple(state, &state->memtuples[i]);
    2518         6969 :                 CHECK_FOR_INTERRUPTS();
    2519              :             }
    2520              :             else
    2521         8151 :                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
    2522              :         }
    2523              :     }
    2524              : 
    2525              :     Assert(state->memtupcount == state->bound);
    2526          255 :     state->status = TSS_BOUNDED;
    2527          255 : }
    2528              : 
    2529              : /*
    2530              :  * Convert the bounded heap to a properly-sorted array
    2531              :  */
    2532              : static void
    2533          255 : sort_bounded_heap(Tuplesortstate *state)
    2534              : {
    2535          255 :     int         tupcount = state->memtupcount;
    2536              : 
    2537              :     Assert(state->status == TSS_BOUNDED);
    2538              :     Assert(state->bounded);
    2539              :     Assert(tupcount == state->bound);
    2540              :     Assert(SERIAL(state));
    2541              : 
    2542              :     /*
    2543              :      * We can unheapify in place because each delete-top call will remove the
    2544              :      * largest entry, which we can promptly store in the newly freed slot at
    2545              :      * the end.  Once we're down to a single-entry heap, we're done.
    2546              :      */
    2547        14865 :     while (state->memtupcount > 1)
    2548              :     {
    2549        14610 :         SortTuple   stup = state->memtuples[0];
    2550              : 
    2551              :         /* this sifts-up the next-largest entry and decreases memtupcount */
    2552        14610 :         tuplesort_heap_delete_top(state);
    2553        14610 :         state->memtuples[state->memtupcount] = stup;
    2554              :     }
    2555          255 :     state->memtupcount = tupcount;
    2556              : 
    2557              :     /*
    2558              :      * Reverse sort direction back to the original state.  This is not
    2559              :      * actually necessary but seems like a good idea for tidiness.
    2560              :      */
    2561          255 :     reversedirection(state);
    2562              : 
    2563          255 :     state->status = TSS_SORTEDINMEM;
    2564          255 :     state->boundUsed = true;
    2565          255 : }
    2566              : 
    2567              : 
    2568              : /* radix sort routines */
    2569              : 
    2570              : /*
    2571              :  * Retrieve byte from datum, indexed by 'level': 0 for MSB, 7 for LSB
    2572              :  */
    2573              : static inline uint8
    2574     15756245 : current_byte(Datum key, int level)
    2575              : {
    2576     15756245 :     int         shift = (sizeof(Datum) - 1 - level) * BITS_PER_BYTE;
    2577              : 
    2578     15756245 :     return (key >> shift) & 0xFF;
    2579              : }
    2580              : 
    2581              : /*
    2582              :  * Normalize datum such that unsigned comparison is order-preserving,
    2583              :  * taking ASC/DESC into account as well.
    2584              :  */
    2585              : static inline Datum
    2586     15784903 : normalize_datum(Datum orig, SortSupport ssup)
    2587              : {
    2588              :     Datum       norm_datum1;
    2589              : 
    2590     15784903 :     if (ssup->comparator == ssup_datum_signed_cmp)
    2591              :     {
    2592      1107346 :         norm_datum1 = orig + ((uint64) PG_INT64_MAX) + 1;
    2593              :     }
    2594     14677557 :     else if (ssup->comparator == ssup_datum_int32_cmp)
    2595              :     {
    2596              :         /*
    2597              :          * First truncate to uint32. Technically, we don't need to do this,
    2598              :          * but it forces the upper half of the datum to be zero regardless of
    2599              :          * sign.
    2600              :          */
    2601      7713043 :         uint32      u32 = DatumGetUInt32(orig) + ((uint32) PG_INT32_MAX) + 1;
    2602              : 
    2603      7713043 :         norm_datum1 = UInt32GetDatum(u32);
    2604              :     }
    2605              :     else
    2606              :     {
    2607              :         Assert(ssup->comparator == ssup_datum_unsigned_cmp);
    2608      6964514 :         norm_datum1 = orig;
    2609              :     }
    2610              : 
    2611     15784903 :     if (ssup->ssup_reverse)
    2612       804395 :         norm_datum1 = ~norm_datum1;
    2613              : 
    2614     15784903 :     return norm_datum1;
    2615              : }
    2616              : 
    2617              : /*
    2618              :  * radix_sort_recursive
    2619              :  *
    2620              :  * Radix sort by (pass-by-value) datum1, diverting to qsort_tuple()
    2621              :  * for tiebreaks.
    2622              :  *
    2623              :  * This is a modification of
    2624              :  * ska_byte_sort() from https://github.com/skarupke/ska_sort
    2625              :  * The original copyright notice follows:
    2626              :  *
    2627              :  * Copyright Malte Skarupke 2016.
    2628              :  * Distributed under the Boost Software License, Version 1.0.
    2629              :  *
    2630              :  * Boost Software License - Version 1.0 - August 17th, 2003
    2631              :  *
    2632              :  * Permission is hereby granted, free of charge, to any person or organization
    2633              :  * obtaining a copy of the software and accompanying documentation covered by
    2634              :  * this license (the "Software") to use, reproduce, display, distribute,
    2635              :  * execute, and transmit the Software, and to prepare derivative works of the
    2636              :  * Software, and to permit third-parties to whom the Software is furnished to
    2637              :  * do so, all subject to the following:
    2638              :  *
    2639              :  * The copyright notices in the Software and this entire statement, including
    2640              :  * the above license grant, this restriction and the following disclaimer,
    2641              :  * must be included in all copies of the Software, in whole or in part, and
    2642              :  * all derivative works of the Software, unless such copies or derivative
    2643              :  * works are solely in the form of machine-executable object code generated by
    2644              :  * a source language processor.
    2645              :  *
    2646              :  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    2647              :  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    2648              :  * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
    2649              :  * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
    2650              :  * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
    2651              :  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
    2652              :  * DEALINGS IN THE SOFTWARE.
    2653              :  */
    2654              : static void
    2655        28658 : radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
    2656              : {
    2657        28658 :     RadixSortInfo partitions[256] = {0};
    2658              :     uint8       remaining_partitions[256];
    2659        28658 :     size_t      total = 0;
    2660        28658 :     int         num_partitions = 0;
    2661              :     int         num_remaining;
    2662        28658 :     SortSupport ssup = &state->base.sortKeys[0];
    2663              :     Datum       ref_datum;
    2664        28658 :     Datum       common_upper_bits = 0;
    2665        28658 :     size_t      start_offset = 0;
    2666        28658 :     SortTuple  *partition_begin = begin;
    2667              :     int         next_level;
    2668              : 
    2669              :     /* count number of occurrences of each byte */
    2670        28658 :     ref_datum = normalize_datum(begin[0].datum1, ssup);
    2671     15784903 :     for (SortTuple *st = begin; st < begin + n_elems; st++)
    2672              :     {
    2673              :         Datum       this_datum;
    2674              :         uint8       this_partition;
    2675              : 
    2676     15756245 :         this_datum = normalize_datum(st->datum1, ssup);
    2677              :         /* accumulate bits different from the reference datum */
    2678     15756245 :         common_upper_bits |= ref_datum ^ this_datum;
    2679              : 
    2680              :         /* extract the byte for this level from the normalized datum */
    2681     15756245 :         this_partition = current_byte(this_datum, level);
    2682              : 
    2683              :         /* save it for the permutation step */
    2684     15756245 :         st->curbyte = this_partition;
    2685              : 
    2686     15756245 :         partitions[this_partition].count++;
    2687              : 
    2688     15756245 :         CHECK_FOR_INTERRUPTS();
    2689              :     }
    2690              : 
    2691              :     /* compute partition offsets */
    2692      7365106 :     for (int i = 0; i < 256; i++)
    2693              :     {
    2694      7336448 :         size_t      count = partitions[i].count;
    2695              : 
    2696      7336448 :         if (count != 0)
    2697              :         {
    2698      1813770 :             partitions[i].offset = total;
    2699      1813770 :             total += count;
    2700      1813770 :             remaining_partitions[num_partitions] = i;
    2701      1813770 :             num_partitions++;
    2702              :         }
    2703      7336448 :         partitions[i].next_offset = total;
    2704              :     }
    2705              : 
    2706              :     /*
    2707              :      * Swap tuples to correct partition.
    2708              :      *
    2709              :      * In traditional American flag sort, a swap sends the current element to
    2710              :      * the correct partition, but the array pointer only advances if the
    2711              :      * partner of the swap happens to be an element that belongs in the
    2712              :      * current partition. That only requires one pass through the array, but
    2713              :      * the disadvantage is we don't know if the pointer can advance until the
    2714              :      * swap completes. Here lies the most interesting innovation from the
    2715              :      * upstream ska_byte_sort: After initiating the swap, we immediately
    2716              :      * proceed to the next element. This makes better use of CPU pipelining,
    2717              :      * but also means that we will often need multiple iterations of this
    2718              :      * loop. ska_byte_sort() maintains a separate list of which partitions
    2719              :      * haven't finished, which is updated every loop iteration. Here we simply
    2720              :      * check each partition during every iteration.
    2721              :      *
    2722              :      * If we started with a single partition, there is nothing to do. If a
    2723              :      * previous loop iteration results in only one partition that hasn't been
    2724              :      * counted as sorted, we know it's actually sorted and can exit the loop.
    2725              :      */
    2726        28658 :     num_remaining = num_partitions;
    2727       114549 :     while (num_remaining > 1)
    2728              :     {
    2729              :         /* start the count over */
    2730        85891 :         num_remaining = num_partitions;
    2731              : 
    2732      7577056 :         for (int i = 0; i < num_partitions; i++)
    2733              :         {
    2734      7491165 :             uint8       idx = remaining_partitions[i];
    2735              : 
    2736      7491165 :             for (SortTuple *st = begin + partitions[idx].offset;
    2737     17821761 :                  st < begin + partitions[idx].next_offset;
    2738     10330596 :                  st++)
    2739              :             {
    2740     10330596 :                 size_t      offset = partitions[st->curbyte].offset++;
    2741              :                 SortTuple   tmp;
    2742              : 
    2743              :                 /* swap current tuple with destination position */
    2744              :                 Assert(offset < n_elems);
    2745     10330596 :                 tmp = *st;
    2746     10330596 :                 *st = begin[offset];
    2747     10330596 :                 begin[offset] = tmp;
    2748              : 
    2749     10330596 :                 CHECK_FOR_INTERRUPTS();
    2750              :             };
    2751              : 
    2752              :             /* Is this partition sorted? */
    2753      7491165 :             if (partitions[idx].offset == partitions[idx].next_offset)
    2754      5685941 :                 num_remaining--;
    2755              :         }
    2756              :     }
    2757              : 
    2758              :     /* recurse */
    2759              : 
    2760        28658 :     if (num_partitions == 1)
    2761              :     {
    2762              :         /*
    2763              :          * There is only one distinct byte at the current level. It can happen
    2764              :          * that some subsequent bytes are also the same for all input values,
    2765              :          * such as the upper bytes of small integers. To skip unproductive
    2766              :          * passes for that case, we compute the level where the input has more
    2767              :          * than one distinct byte, so that the next recursion can start there.
    2768              :          */
    2769         5437 :         if (common_upper_bits == 0)
    2770          352 :             next_level = sizeof(Datum);
    2771              :         else
    2772              :         {
    2773              :             int         diffpos;
    2774              : 
    2775              :             /*
    2776              :              * The upper bits of common_upper_bits are zero where all datums
    2777              :              * have the same bits.
    2778              :              */
    2779         5085 :             diffpos = pg_leftmost_one_pos64(DatumGetUInt64(common_upper_bits));
    2780         5085 :             next_level = sizeof(Datum) - 1 - (diffpos / BITS_PER_BYTE);
    2781              :         }
    2782              :     }
    2783              :     else
    2784        23221 :         next_level = level + 1;
    2785              : 
    2786        28658 :     for (uint8 *rp = remaining_partitions;
    2787      1842428 :          rp < remaining_partitions + num_partitions;
    2788      1813770 :          rp++)
    2789              :     {
    2790      1813770 :         size_t      end_offset = partitions[*rp].next_offset;
    2791      1813770 :         SortTuple  *partition_end = begin + end_offset;
    2792      1813770 :         size_t      num_elements = end_offset - start_offset;
    2793              : 
    2794      1813770 :         if (num_elements > 1)
    2795              :         {
    2796       744436 :             if (next_level < sizeof(Datum))
    2797              :             {
    2798       638919 :                 if (num_elements < QSORT_THRESHOLD)
    2799              :                 {
    2800       612441 :                     qsort_tuple(partition_begin,
    2801              :                                 num_elements,
    2802              :                                 state->base.comparetup,
    2803              :                                 state);
    2804              :                 }
    2805              :                 else
    2806              :                 {
    2807        26478 :                     radix_sort_recursive(partition_begin,
    2808              :                                          num_elements,
    2809              :                                          next_level,
    2810              :                                          state);
    2811              :                 }
    2812              :             }
    2813       105517 :             else if (state->base.onlyKey == NULL)
    2814              :             {
    2815              :                 /*
    2816              :                  * We've finished radix sort on all bytes of the pass-by-value
    2817              :                  * datum (possibly abbreviated), now sort using the tiebreak
    2818              :                  * comparator.
    2819              :                  */
    2820        24099 :                 qsort_tuple(partition_begin,
    2821              :                             num_elements,
    2822              :                             state->base.comparetup_tiebreak,
    2823              :                             state);
    2824              :             }
    2825              :         }
    2826              : 
    2827      1813770 :         start_offset = end_offset;
    2828      1813770 :         partition_begin = partition_end;
    2829              :     }
    2830        28658 : }
    2831              : 
    2832              : /*
    2833              :  * Entry point for radix_sort_recursive
    2834              :  *
    2835              :  * Partition tuples by isnull1, then sort both partitions, using
    2836              :  * radix sort on the NOT NULL partition if it's large enough.
    2837              :  */
    2838              : static void
    2839         3629 : radix_sort_tuple(SortTuple *data, size_t n, Tuplesortstate *state)
    2840              : {
    2841         3629 :     bool        nulls_first = state->base.sortKeys[0].ssup_nulls_first;
    2842              :     SortTuple  *null_start;
    2843              :     SortTuple  *not_null_start;
    2844         3629 :     size_t      d1 = 0,
    2845              :                 d2,
    2846              :                 null_count,
    2847              :                 not_null_count;
    2848              : 
    2849              :     /*
    2850              :      * Find the first NOT NULL if NULLS FIRST, or first NULL if NULLS LAST.
    2851              :      * This also serves as a quick check for the common case where all tuples
    2852              :      * are NOT NULL in the first sort key.
    2853              :      */
    2854      9122056 :     while (d1 < n && data[d1].isnull1 == nulls_first)
    2855              :     {
    2856      9118427 :         d1++;
    2857      9118427 :         CHECK_FOR_INTERRUPTS();
    2858              :     }
    2859              : 
    2860              :     /*
    2861              :      * If we have more than one tuple left after the quick check, partition
    2862              :      * the remainder using branchless cyclic permutation, based on
    2863              :      * https://orlp.net/blog/branchless-lomuto-partitioning/
    2864              :      */
    2865              :     Assert(n > 0);
    2866         3629 :     if (d1 < n - 1)
    2867              :     {
    2868          735 :         size_t      i = d1,
    2869          735 :                     j = d1;
    2870          735 :         SortTuple   tmp = data[d1]; /* create gap at front */
    2871              : 
    2872       443880 :         while (j < n - 1)
    2873              :         {
    2874              :             /* gap is at j, move i's element to gap */
    2875       443145 :             data[j] = data[i];
    2876              :             /* advance j to the first unknown element */
    2877       443145 :             j += 1;
    2878              :             /* move the first unknown element back to i */
    2879       443145 :             data[i] = data[j];
    2880              :             /* advance i if this element belongs in the left partition */
    2881       443145 :             i += (data[i].isnull1 == nulls_first);
    2882              : 
    2883       443145 :             CHECK_FOR_INTERRUPTS();
    2884              :         }
    2885              : 
    2886              :         /* place gap between left and right partitions */
    2887          735 :         data[j] = data[i];
    2888              :         /* restore the saved element */
    2889          735 :         data[i] = tmp;
    2890              :         /* assign it to the correct partition */
    2891          735 :         i += (data[i].isnull1 == nulls_first);
    2892              : 
    2893              :         /* d1 is now the number of elements in the left partition */
    2894          735 :         d1 = i;
    2895              :     }
    2896              : 
    2897         3629 :     d2 = n - d1;
    2898              : 
    2899              :     /* set pointers and counts for each partition */
    2900         3629 :     if (nulls_first)
    2901              :     {
    2902          630 :         null_start = data;
    2903          630 :         null_count = d1;
    2904          630 :         not_null_start = data + d1;
    2905          630 :         not_null_count = d2;
    2906              :     }
    2907              :     else
    2908              :     {
    2909         2999 :         not_null_start = data;
    2910         2999 :         not_null_count = d1;
    2911         2999 :         null_start = data + d1;
    2912         2999 :         null_count = d2;
    2913              :     }
    2914              : 
    2915         3629 :     for (SortTuple *st = null_start;
    2916         6036 :          st < null_start + null_count;
    2917         2407 :          st++)
    2918              :         Assert(st->isnull1 == true);
    2919         3629 :     for (SortTuple *st = not_null_start;
    2920      9563533 :          st < not_null_start + not_null_count;
    2921      9559904 :          st++)
    2922              :         Assert(st->isnull1 == false);
    2923              : 
    2924              :     /*
    2925              :      * Sort the NULL partition using tiebreak comparator, if necessary.
    2926              :      */
    2927         3629 :     if (state->base.onlyKey == NULL && null_count > 1)
    2928              :     {
    2929          111 :         qsort_tuple(null_start,
    2930              :                     null_count,
    2931              :                     state->base.comparetup_tiebreak,
    2932              :                     state);
    2933              :     }
    2934              : 
    2935              :     /*
    2936              :      * Sort the NOT NULL partition, using radix sort if large enough,
    2937              :      * otherwise fall back to quicksort.
    2938              :      */
    2939         3629 :     if (not_null_count < QSORT_THRESHOLD)
    2940              :     {
    2941           14 :         qsort_tuple(not_null_start,
    2942              :                     not_null_count,
    2943              :                     state->base.comparetup,
    2944              :                     state);
    2945              :     }
    2946              :     else
    2947              :     {
    2948         3615 :         bool        presorted = true;
    2949              : 
    2950         3615 :         for (SortTuple *st = not_null_start + 1;
    2951      4519127 :              st < not_null_start + not_null_count;
    2952      4515512 :              st++)
    2953              :         {
    2954      4517692 :             if (COMPARETUP(state, st - 1, st) > 0)
    2955              :             {
    2956         2180 :                 presorted = false;
    2957         2180 :                 break;
    2958              :             }
    2959              : 
    2960      4515512 :             CHECK_FOR_INTERRUPTS();
    2961              :         }
    2962              : 
    2963         3615 :         if (presorted)
    2964         1435 :             return;
    2965              :         else
    2966              :         {
    2967         2180 :             radix_sort_recursive(not_null_start,
    2968              :                                  not_null_count,
    2969              :                                  0,
    2970              :                                  state);
    2971              :         }
    2972              :     }
    2973              : }
    2974              : 
    2975              : /* Verify in-memory sort using standard comparator. */
    2976              : static void
    2977         3629 : verify_memtuples_sorted(Tuplesortstate *state)
    2978              : {
    2979              : #ifdef USE_ASSERT_CHECKING
    2980              :     for (SortTuple *st = state->memtuples + 1;
    2981              :          st < state->memtuples + state->memtupcount;
    2982              :          st++)
    2983              :         Assert(COMPARETUP(state, st - 1, st) <= 0);
    2984              : #endif
    2985         3629 : }
    2986              : 
    2987              : /*
    2988              :  * Sort all memtuples using specialized routines.
    2989              :  *
    2990              :  * Quicksort or radix sort is used for small in-memory sorts,
    2991              :  * and external sort runs.
    2992              :  */
    2993              : static void
    2994       149879 : tuplesort_sort_memtuples(Tuplesortstate *state)
    2995              : {
    2996              :     Assert(!LEADER(state));
    2997              : 
    2998       149879 :     if (state->memtupcount > 1)
    2999              :     {
    3000              :         /*
    3001              :          * Do we have the leading column's value or abbreviation in datum1?
    3002              :          */
    3003        44651 :         if (state->base.haveDatum1 && state->base.sortKeys)
    3004              :         {
    3005        44581 :             SortSupport ssup = &state->base.sortKeys[0];
    3006              : 
    3007              :             /* Does it compare as an integer? */
    3008        44581 :             if (state->memtupcount >= QSORT_THRESHOLD &&
    3009         8319 :                 (ssup->comparator == ssup_datum_unsigned_cmp ||
    3010         7802 :                  ssup->comparator == ssup_datum_signed_cmp ||
    3011         7612 :                  ssup->comparator == ssup_datum_int32_cmp))
    3012              :             {
    3013         3629 :                 radix_sort_tuple(state->memtuples,
    3014         3629 :                                  state->memtupcount,
    3015              :                                  state);
    3016         3629 :                 verify_memtuples_sorted(state);
    3017         3629 :                 return;
    3018              :             }
    3019              :         }
    3020              : 
    3021              :         /* Can we use the single-key sort function? */
    3022        41022 :         if (state->base.onlyKey != NULL)
    3023              :         {
    3024        27517 :             qsort_ssup(state->memtuples, state->memtupcount,
    3025        27517 :                        state->base.onlyKey);
    3026              :         }
    3027              :         else
    3028              :         {
    3029        13505 :             qsort_tuple(state->memtuples,
    3030        13505 :                         state->memtupcount,
    3031              :                         state->base.comparetup,
    3032              :                         state);
    3033              :         }
    3034              :     }
    3035              : }
    3036              : 
    3037              : /*
    3038              :  * Insert a new tuple into an empty or existing heap, maintaining the
    3039              :  * heap invariant.  Caller is responsible for ensuring there's room.
    3040              :  *
    3041              :  * Note: For some callers, tuple points to a memtuples[] entry above the
    3042              :  * end of the heap.  This is safe as long as it's not immediately adjacent
    3043              :  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
    3044              :  * is, it might get overwritten before being moved into the heap!
    3045              :  */
    3046              : static void
    3047        15748 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
    3048              : {
    3049              :     SortTuple  *memtuples;
    3050              :     int         j;
    3051              : 
    3052        15748 :     memtuples = state->memtuples;
    3053              :     Assert(state->memtupcount < state->memtupsize);
    3054              : 
    3055        15748 :     CHECK_FOR_INTERRUPTS();
    3056              : 
    3057              :     /*
    3058              :      * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
    3059              :      * using 1-based array indexes, not 0-based.
    3060              :      */
    3061        15748 :     j = state->memtupcount++;
    3062        42682 :     while (j > 0)
    3063              :     {
    3064        38540 :         int         i = (j - 1) >> 1;
    3065              : 
    3066        38540 :         if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
    3067        11606 :             break;
    3068        26934 :         memtuples[j] = memtuples[i];
    3069        26934 :         j = i;
    3070              :     }
    3071        15748 :     memtuples[j] = *tuple;
    3072        15748 : }
    3073              : 
    3074              : /*
    3075              :  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
    3076              :  * memtupcount, and sift up to maintain the heap invariant.
    3077              :  *
    3078              :  * The caller has already free'd the tuple the top node points to,
    3079              :  * if necessary.
    3080              :  */
    3081              : static void
    3082        15461 : tuplesort_heap_delete_top(Tuplesortstate *state)
    3083              : {
    3084        15461 :     SortTuple  *memtuples = state->memtuples;
    3085              :     SortTuple  *tuple;
    3086              : 
    3087        15461 :     if (--state->memtupcount <= 0)
    3088          217 :         return;
    3089              : 
    3090              :     /*
    3091              :      * Remove the last tuple in the heap, and re-insert it, by replacing the
    3092              :      * current top node with it.
    3093              :      */
    3094        15244 :     tuple = &memtuples[state->memtupcount];
    3095        15244 :     tuplesort_heap_replace_top(state, tuple);
    3096              : }
    3097              : 
    3098              : /*
    3099              :  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
    3100              :  * maintain the heap invariant.
    3101              :  *
    3102              :  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
    3103              :  * Heapsort, steps H3-H8).
    3104              :  */
    3105              : static void
    3106      3681329 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
    3107              : {
    3108      3681329 :     SortTuple  *memtuples = state->memtuples;
    3109              :     unsigned int i,
    3110              :                 n;
    3111              : 
    3112              :     Assert(state->memtupcount >= 1);
    3113              : 
    3114      3681329 :     CHECK_FOR_INTERRUPTS();
    3115              : 
    3116              :     /*
    3117              :      * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
    3118              :      * This prevents overflow in the "2 * i + 1" calculation, since at the top
    3119              :      * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
    3120              :      */
    3121      3681329 :     n = state->memtupcount;
    3122      3681329 :     i = 0;                      /* i is where the "hole" is */
    3123              :     for (;;)
    3124      1229408 :     {
    3125      4910737 :         unsigned int j = 2 * i + 1;
    3126              : 
    3127      4910737 :         if (j >= n)
    3128       778817 :             break;
    3129      5803030 :         if (j + 1 < n &&
    3130      1671110 :             COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
    3131       664210 :             j++;
    3132      4131920 :         if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
    3133      2902512 :             break;
    3134      1229408 :         memtuples[i] = memtuples[j];
    3135      1229408 :         i = j;
    3136              :     }
    3137      3681329 :     memtuples[i] = *tuple;
    3138      3681329 : }
    3139              : 
    3140              : /*
    3141              :  * Function to reverse the sort direction from its current state
    3142              :  *
    3143              :  * It is not safe to call this when performing hash tuplesorts
    3144              :  */
    3145              : static void
    3146          510 : reversedirection(Tuplesortstate *state)
    3147              : {
    3148          510 :     SortSupport sortKey = state->base.sortKeys;
    3149              :     int         nkey;
    3150              : 
    3151         1244 :     for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
    3152              :     {
    3153          734 :         sortKey->ssup_reverse = !sortKey->ssup_reverse;
    3154          734 :         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    3155              :     }
    3156          510 : }
    3157              : 
    3158              : 
    3159              : /*
    3160              :  * Tape interface routines
    3161              :  */
    3162              : 
    3163              : static unsigned int
    3164      3522217 : getlen(LogicalTape *tape, bool eofOK)
    3165              : {
    3166              :     unsigned int len;
    3167              : 
    3168      3522217 :     if (LogicalTapeRead(tape,
    3169              :                         &len, sizeof(len)) != sizeof(len))
    3170            0 :         elog(ERROR, "unexpected end of tape");
    3171      3522217 :     if (len == 0 && !eofOK)
    3172            0 :         elog(ERROR, "unexpected end of data");
    3173      3522217 :     return len;
    3174              : }
    3175              : 
    3176              : static void
    3177         1200 : markrunend(LogicalTape *tape)
    3178              : {
    3179         1200 :     unsigned int len = 0;
    3180              : 
    3181         1200 :     LogicalTapeWrite(tape, &len, sizeof(len));
    3182         1200 : }
    3183              : 
    3184              : /*
    3185              :  * Get memory for tuple from within READTUP() routine.
    3186              :  *
    3187              :  * We use next free slot from the slab allocator, or palloc() if the tuple
    3188              :  * is too large for that.
    3189              :  */
    3190              : void *
    3191      3265976 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
    3192              : {
    3193              :     SlabSlot   *buf;
    3194              : 
    3195              :     /*
    3196              :      * We pre-allocate enough slots in the slab arena that we should never run
    3197              :      * out.
    3198              :      */
    3199              :     Assert(state->slabFreeHead);
    3200              : 
    3201      3265976 :     if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
    3202            4 :         return MemoryContextAlloc(state->base.sortcontext, tuplen);
    3203              :     else
    3204              :     {
    3205      3265972 :         buf = state->slabFreeHead;
    3206              :         /* Reuse this slot */
    3207      3265972 :         state->slabFreeHead = buf->nextfree;
    3208              : 
    3209      3265972 :         return buf;
    3210              :     }
    3211              : }
    3212              : 
    3213              : 
    3214              : /*
    3215              :  * Parallel sort routines
    3216              :  */
    3217              : 
    3218              : /*
    3219              :  * tuplesort_estimate_shared - estimate required shared memory allocation
    3220              :  *
    3221              :  * nWorkers is an estimate of the number of workers (it's the number that
    3222              :  * will be requested).
    3223              :  */
    3224              : Size
    3225          131 : tuplesort_estimate_shared(int nWorkers)
    3226              : {
    3227              :     Size        tapesSize;
    3228              : 
    3229              :     Assert(nWorkers > 0);
    3230              : 
    3231              :     /* Make sure that BufFile shared state is MAXALIGN'd */
    3232          131 :     tapesSize = mul_size(sizeof(TapeShare), nWorkers);
    3233          131 :     tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
    3234              : 
    3235          131 :     return tapesSize;
    3236              : }
    3237              : 
    3238              : /*
    3239              :  * tuplesort_initialize_shared - initialize shared tuplesort state
    3240              :  *
    3241              :  * Must be called from leader process before workers are launched, to
    3242              :  * establish state needed up-front for worker tuplesortstates.  nWorkers
    3243              :  * should match the argument passed to tuplesort_estimate_shared().
    3244              :  */
    3245              : void
    3246          179 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
    3247              : {
    3248              :     int         i;
    3249              : 
    3250              :     Assert(nWorkers > 0);
    3251              : 
    3252          179 :     SpinLockInit(&shared->mutex);
    3253          179 :     shared->currentWorker = 0;
    3254          179 :     shared->workersFinished = 0;
    3255          179 :     SharedFileSetInit(&shared->fileset, seg);
    3256          179 :     shared->nTapes = nWorkers;
    3257          562 :     for (i = 0; i < nWorkers; i++)
    3258              :     {
    3259          383 :         shared->tapes[i].firstblocknumber = 0L;
    3260              :     }
    3261          179 : }
    3262              : 
    3263              : /*
    3264              :  * tuplesort_attach_shared - attach to shared tuplesort state
    3265              :  *
    3266              :  * Must be called by all worker processes.
    3267              :  */
    3268              : void
    3269          201 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
    3270              : {
    3271              :     /* Attach to SharedFileSet */
    3272          201 :     SharedFileSetAttach(&shared->fileset, seg);
    3273          201 : }
    3274              : 
    3275              : /*
    3276              :  * worker_get_identifier - Assign and return ordinal identifier for worker
    3277              :  *
    3278              :  * The order in which these are assigned is not well defined, and should not
    3279              :  * matter; worker numbers across parallel sort participants need only be
    3280              :  * distinct and gapless.  logtape.c requires this.
    3281              :  *
    3282              :  * Note that the identifiers assigned from here have no relation to
    3283              :  * ParallelWorkerNumber number, to avoid making any assumption about
    3284              :  * caller's requirements.  However, we do follow the ParallelWorkerNumber
    3285              :  * convention of representing a non-worker with worker number -1.  This
    3286              :  * includes the leader, as well as serial Tuplesort processes.
    3287              :  */
    3288              : static int
    3289          379 : worker_get_identifier(Tuplesortstate *state)
    3290              : {
    3291          379 :     Sharedsort *shared = state->shared;
    3292              :     int         worker;
    3293              : 
    3294              :     Assert(WORKER(state));
    3295              : 
    3296          379 :     SpinLockAcquire(&shared->mutex);
    3297          379 :     worker = shared->currentWorker++;
    3298          379 :     SpinLockRelease(&shared->mutex);
    3299              : 
    3300          379 :     return worker;
    3301              : }
    3302              : 
    3303              : /*
    3304              :  * worker_freeze_result_tape - freeze worker's result tape for leader
    3305              :  *
    3306              :  * This is called by workers just after the result tape has been determined,
    3307              :  * instead of calling LogicalTapeFreeze() directly.  They do so because
    3308              :  * workers require a few additional steps over similar serial
    3309              :  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
    3310              :  * steps are around freeing now unneeded resources, and representing to
    3311              :  * leader that worker's input run is available for its merge.
    3312              :  *
    3313              :  * There should only be one final output run for each worker, which consists
    3314              :  * of all tuples that were originally input into worker.
    3315              :  */
    3316              : static void
    3317          379 : worker_freeze_result_tape(Tuplesortstate *state)
    3318              : {
    3319          379 :     Sharedsort *shared = state->shared;
    3320              :     TapeShare   output;
    3321              : 
    3322              :     Assert(WORKER(state));
    3323              :     Assert(state->result_tape != NULL);
    3324              :     Assert(state->memtupcount == 0);
    3325              : 
    3326              :     /*
    3327              :      * Free most remaining memory, in case caller is sensitive to our holding
    3328              :      * on to it.  memtuples may not be a tiny merge heap at this point.
    3329              :      */
    3330          379 :     pfree(state->memtuples);
    3331              :     /* Be tidy */
    3332          379 :     state->memtuples = NULL;
    3333          379 :     state->memtupsize = 0;
    3334              : 
    3335              :     /*
    3336              :      * Parallel worker requires result tape metadata, which is to be stored in
    3337              :      * shared memory for leader
    3338              :      */
    3339          379 :     LogicalTapeFreeze(state->result_tape, &output);
    3340              : 
    3341              :     /* Store properties of output tape, and update finished worker count */
    3342          379 :     SpinLockAcquire(&shared->mutex);
    3343          379 :     shared->tapes[state->worker] = output;
    3344          379 :     shared->workersFinished++;
    3345          379 :     SpinLockRelease(&shared->mutex);
    3346          379 : }
    3347              : 
    3348              : /*
    3349              :  * worker_nomergeruns - dump memtuples in worker, without merging
    3350              :  *
    3351              :  * This called as an alternative to mergeruns() with a worker when no
    3352              :  * merging is required.
    3353              :  */
    3354              : static void
    3355          379 : worker_nomergeruns(Tuplesortstate *state)
    3356              : {
    3357              :     Assert(WORKER(state));
    3358              :     Assert(state->result_tape == NULL);
    3359              :     Assert(state->nOutputRuns == 1);
    3360              : 
    3361          379 :     state->result_tape = state->destTape;
    3362          379 :     worker_freeze_result_tape(state);
    3363          379 : }
    3364              : 
    3365              : /*
    3366              :  * leader_takeover_tapes - create tapeset for leader from worker tapes
    3367              :  *
    3368              :  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
    3369              :  * sorting has occurred in workers, all of which must have already returned
    3370              :  * from tuplesort_performsort().
    3371              :  *
    3372              :  * When this returns, leader process is left in a state that is virtually
    3373              :  * indistinguishable from it having generated runs as a serial external sort
    3374              :  * might have.
    3375              :  */
    3376              : static void
    3377          130 : leader_takeover_tapes(Tuplesortstate *state)
    3378              : {
    3379          130 :     Sharedsort *shared = state->shared;
    3380          130 :     int         nParticipants = state->nParticipants;
    3381              :     int         workersFinished;
    3382              :     int         j;
    3383              : 
    3384              :     Assert(LEADER(state));
    3385              :     Assert(nParticipants >= 1);
    3386              : 
    3387          130 :     SpinLockAcquire(&shared->mutex);
    3388          130 :     workersFinished = shared->workersFinished;
    3389          130 :     SpinLockRelease(&shared->mutex);
    3390              : 
    3391          130 :     if (nParticipants != workersFinished)
    3392            0 :         elog(ERROR, "cannot take over tapes before all workers finish");
    3393              : 
    3394              :     /*
    3395              :      * Create the tapeset from worker tapes, including a leader-owned tape at
    3396              :      * the end.  Parallel workers are far more expensive than logical tapes,
    3397              :      * so the number of tapes allocated here should never be excessive.
    3398              :      */
    3399          130 :     inittapestate(state, nParticipants);
    3400          130 :     state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
    3401              : 
    3402              :     /*
    3403              :      * Set currentRun to reflect the number of runs we will merge (it's not
    3404              :      * used for anything, this is just pro forma)
    3405              :      */
    3406          130 :     state->currentRun = nParticipants;
    3407              : 
    3408              :     /*
    3409              :      * Initialize the state to look the same as after building the initial
    3410              :      * runs.
    3411              :      *
    3412              :      * There will always be exactly 1 run per worker, and exactly one input
    3413              :      * tape per run, because workers always output exactly 1 run, even when
    3414              :      * there were no input tuples for workers to sort.
    3415              :      */
    3416          130 :     state->inputTapes = NULL;
    3417          130 :     state->nInputTapes = 0;
    3418          130 :     state->nInputRuns = 0;
    3419              : 
    3420          130 :     state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
    3421          130 :     state->nOutputTapes = nParticipants;
    3422          130 :     state->nOutputRuns = nParticipants;
    3423              : 
    3424          413 :     for (j = 0; j < nParticipants; j++)
    3425              :     {
    3426          283 :         state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    3427              :     }
    3428              : 
    3429          130 :     state->status = TSS_BUILDRUNS;
    3430          130 : }
    3431              : 
    3432              : /*
    3433              :  * Convenience routine to free a tuple previously loaded into sort memory
    3434              :  */
    3435              : static void
    3436      2145890 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
    3437              : {
    3438      2145890 :     if (stup->tuple)
    3439              :     {
    3440      2044155 :         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    3441      2044155 :         pfree(stup->tuple);
    3442      2044155 :         stup->tuple = NULL;
    3443              :     }
    3444      2145890 : }
    3445              : 
    3446              : int
    3447      2536788 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
    3448              : {
    3449      2536788 :     if (x < y)
    3450       868211 :         return -1;
    3451      1668577 :     else if (x > y)
    3452       640190 :         return 1;
    3453              :     else
    3454      1028387 :         return 0;
    3455              : }
    3456              : 
    3457              : int
    3458      1712801 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
    3459              : {
    3460      1712801 :     int64       xx = DatumGetInt64(x);
    3461      1712801 :     int64       yy = DatumGetInt64(y);
    3462              : 
    3463      1712801 :     if (xx < yy)
    3464      1254308 :         return -1;
    3465       458493 :     else if (xx > yy)
    3466       228686 :         return 1;
    3467              :     else
    3468       229807 :         return 0;
    3469              : }
    3470              : 
    3471              : int
    3472    121018448 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
    3473              : {
    3474    121018448 :     int32       xx = DatumGetInt32(x);
    3475    121018448 :     int32       yy = DatumGetInt32(y);
    3476              : 
    3477    121018448 :     if (xx < yy)
    3478     30252076 :         return -1;
    3479     90766372 :     else if (xx > yy)
    3480     26447884 :         return 1;
    3481              :     else
    3482     64318488 :         return 0;
    3483              : }
        

Generated by: LCOV version 2.0-1