LCOV - code coverage report
Current view: top level - src/backend/utils/sort - tuplesort.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 724 812 89.2 %
Date: 2025-08-28 18:18:38 Functions: 54 55 98.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * tuplesort.c
       4             :  *    Generalized tuple sorting routines.
       5             :  *
       6             :  * This module provides a generalized facility for tuple sorting, which can be
       7             :  * applied to different kinds of sortable objects.  Implementation of
       8             :  * the particular sorting variants is given in tuplesortvariants.c.
       9             :  * This module works efficiently for both small and large amounts
      10             :  * of data.  Small amounts are sorted in-memory using qsort().  Large
      11             :  * amounts are sorted using temporary files and a standard external sort
      12             :  * algorithm.
      13             :  *
      14             :  * See Knuth, volume 3, for more than you want to know about external
      15             :  * sorting algorithms.  The algorithm we use is a balanced k-way merge.
      16             :  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
      17             :  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
      18             :  * merge is better.  Knuth is assuming that tape drives are expensive
      19             :  * beasts, and in particular that there will always be many more runs than
      20             :  * tape drives.  The polyphase merge algorithm was good at keeping all the
      21             :  * tape drives busy, but in our implementation a "tape drive" doesn't cost
      22             :  * much more than a few Kb of memory buffers, so we can afford to have
      23             :  * lots of them.  In particular, if we can have as many tape drives as
      24             :  * sorted runs, we can eliminate any repeated I/O at all.
      25             :  *
      26             :  * Historically, we divided the input into sorted runs using replacement
      27             :  * selection, in the form of a priority tree implemented as a heap
      28             :  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
      29             :  * for run generation.
      30             :  *
      31             :  * The approximate amount of memory allowed for any one sort operation
      32             :  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
      33             :  * we absorb tuples and simply store them in an unsorted array as long as
      34             :  * we haven't exceeded workMem.  If we reach the end of the input without
      35             :  * exceeding workMem, we sort the array using qsort() and subsequently return
      36             :  * tuples just by scanning the tuple array sequentially.  If we do exceed
      37             :  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
      38             :  * When tuples are dumped in batch after quicksorting, we begin a new run
      39             :  * with a new output tape.  If we reach the max number of tapes, we write
      40             :  * subsequent runs on the existing tapes in a round-robin fashion.  We will
      41             :  * need multiple merge passes to finish the merge in that case.  After the
      42             :  * end of the input is reached, we dump out remaining tuples in memory into
      43             :  * a final run, then merge the runs.
      44             :  *
      45             :  * When merging runs, we use a heap containing just the frontmost tuple from
      46             :  * each source run; we repeatedly output the smallest tuple and replace it
      47             :  * with the next tuple from its source tape (if any).  When the heap empties,
      48             :  * the merge is complete.  The basic merge algorithm thus needs very little
      49             :  * memory --- only M tuples for an M-way merge, and M is constrained to a
      50             :  * small number.  However, we can still make good use of our full workMem
      51             :  * allocation by pre-reading additional blocks from each source tape.  Without
      52             :  * prereading, our access pattern to the temporary file would be very erratic;
      53             :  * on average we'd read one block from each of M source tapes during the same
      54             :  * time that we're writing M blocks to the output tape, so there is no
      55             :  * sequentiality of access at all, defeating the read-ahead methods used by
      56             :  * most Unix kernels.  Worse, the output tape gets written into a very random
      57             :  * sequence of blocks of the temp file, ensuring that things will be even
      58             :  * worse when it comes time to read that tape.  A straightforward merge pass
      59             :  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
      60             :  * by prereading from each source tape sequentially, loading about workMem/M
      61             :  * bytes from each tape in turn, and making the sequential blocks immediately
      62             :  * available for reuse.  This approach helps to localize both read and write
      63             :  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
      64             :  * much memory to use for the buffers.
      65             :  *
      66             :  * In the current code we determine the number of input tapes M on the basis
      67             :  * of workMem: we want workMem/M to be large enough that we read a fair
      68             :  * amount of data each time we read from a tape, so as to maintain the
      69             :  * locality of access described above.  Nonetheless, with large workMem we
      70             :  * can have many tapes.  The logical "tapes" are implemented by logtape.c,
      71             :  * which avoids space wastage by recycling disk space as soon as each block
      72             :  * is read from its "tape".
      73             :  *
      74             :  * When the caller requests random access to the sort result, we form
      75             :  * the final sorted run on a logical tape which is then "frozen", so
      76             :  * that we can access it randomly.  When the caller does not need random
      77             :  * access, we return from tuplesort_performsort() as soon as we are down
      78             :  * to one run per logical tape.  The final merge is then performed
      79             :  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
      80             :  * saves one cycle of writing all the data out to disk and reading it in.
      81             :  *
      82             :  * This module supports parallel sorting.  Parallel sorts involve coordination
      83             :  * among one or more worker processes, and a leader process, each with its own
      84             :  * tuplesort state.  The leader process (or, more accurately, the
      85             :  * Tuplesortstate associated with a leader process) creates a full tapeset
      86             :  * consisting of worker tapes with one run to merge; a run for every
      87             :  * worker process.  This is then merged.  Worker processes are guaranteed to
      88             :  * produce exactly one output run from their partial input.
      89             :  *
      90             :  *
      91             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      92             :  * Portions Copyright (c) 1994, Regents of the University of California
      93             :  *
      94             :  * IDENTIFICATION
      95             :  *    src/backend/utils/sort/tuplesort.c
      96             :  *
      97             :  *-------------------------------------------------------------------------
      98             :  */
      99             : 
     100             : #include "postgres.h"
     101             : 
     102             : #include <limits.h>
     103             : 
     104             : #include "commands/tablespace.h"
     105             : #include "miscadmin.h"
     106             : #include "pg_trace.h"
     107             : #include "storage/shmem.h"
     108             : #include "utils/guc.h"
     109             : #include "utils/memutils.h"
     110             : #include "utils/pg_rusage.h"
     111             : #include "utils/tuplesort.h"
     112             : 
     113             : /*
     114             :  * Initial size of memtuples array.  We're trying to select this size so that
     115             :  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
     116             :  * allocation might possibly be lowered.  However, we don't consider array sizes
     117             :  * less than 1024.
     118             :  *
     119             :  */
     120             : #define INITIAL_MEMTUPSIZE Max(1024, \
     121             :     ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
     122             : 
     123             : /* GUC variables */
     124             : bool        trace_sort = false;
     125             : 
     126             : #ifdef DEBUG_BOUNDED_SORT
     127             : bool        optimize_bounded_sort = true;
     128             : #endif
     129             : 
     130             : 
     131             : /*
     132             :  * During merge, we use a pre-allocated set of fixed-size slots to hold
     133             :  * tuples.  To avoid palloc/pfree overhead.
     134             :  *
     135             :  * Merge doesn't require a lot of memory, so we can afford to waste some,
     136             :  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
     137             :  * palloc() overhead is not significant anymore.
     138             :  *
     139             :  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
     140             :  * slot holds a tuple.
     141             :  */
     142             : #define SLAB_SLOT_SIZE 1024
     143             : 
     144             : typedef union SlabSlot
     145             : {
     146             :     union SlabSlot *nextfree;
     147             :     char        buffer[SLAB_SLOT_SIZE];
     148             : } SlabSlot;
     149             : 
     150             : /*
     151             :  * Possible states of a Tuplesort object.  These denote the states that
     152             :  * persist between calls of Tuplesort routines.
     153             :  */
     154             : typedef enum
     155             : {
     156             :     TSS_INITIAL,                /* Loading tuples; still within memory limit */
     157             :     TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
     158             :     TSS_BUILDRUNS,              /* Loading tuples; writing to tape */
     159             :     TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
     160             :     TSS_SORTEDONTAPE,           /* Sort completed, final run is on tape */
     161             :     TSS_FINALMERGE,             /* Performing final merge on-the-fly */
     162             : } TupSortStatus;
     163             : 
     164             : /*
     165             :  * Parameters for calculation of number of tapes to use --- see inittapes()
     166             :  * and tuplesort_merge_order().
     167             :  *
     168             :  * In this calculation we assume that each tape will cost us about 1 blocks
     169             :  * worth of buffer space.  This ignores the overhead of all the other data
     170             :  * structures needed for each tape, but it's probably close enough.
     171             :  *
     172             :  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
     173             :  * input tape, for pre-reading (see discussion at top of file).  This is *in
     174             :  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
     175             :  */
     176             : #define MINORDER        6       /* minimum merge order */
     177             : #define MAXORDER        500     /* maximum merge order */
     178             : #define TAPE_BUFFER_OVERHEAD        BLCKSZ
     179             : #define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
     180             : 
     181             : 
     182             : /*
     183             :  * Private state of a Tuplesort operation.
     184             :  */
     185             : struct Tuplesortstate
     186             : {
     187             :     TuplesortPublic base;
     188             :     TupSortStatus status;       /* enumerated value as shown above */
     189             :     bool        bounded;        /* did caller specify a maximum number of
     190             :                                  * tuples to return? */
     191             :     bool        boundUsed;      /* true if we made use of a bounded heap */
     192             :     int         bound;          /* if bounded, the maximum number of tuples */
     193             :     int64       tupleMem;       /* memory consumed by individual tuples.
     194             :                                  * storing this separately from what we track
     195             :                                  * in availMem allows us to subtract the
     196             :                                  * memory consumed by all tuples when dumping
     197             :                                  * tuples to tape */
     198             :     int64       availMem;       /* remaining memory available, in bytes */
     199             :     int64       allowedMem;     /* total memory allowed, in bytes */
     200             :     int         maxTapes;       /* max number of input tapes to merge in each
     201             :                                  * pass */
     202             :     int64       maxSpace;       /* maximum amount of space occupied among sort
     203             :                                  * of groups, either in-memory or on-disk */
     204             :     bool        isMaxSpaceDisk; /* true when maxSpace is value for on-disk
     205             :                                  * space, false when its value for in-memory
     206             :                                  * space */
     207             :     TupSortStatus maxSpaceStatus;   /* sort status when maxSpace was reached */
     208             :     LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
     209             : 
     210             :     /*
     211             :      * This array holds the tuples now in sort memory.  If we are in state
     212             :      * INITIAL, the tuples are in no particular order; if we are in state
     213             :      * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     214             :      * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     215             :      * H.  In state SORTEDONTAPE, the array is not used.
     216             :      */
     217             :     SortTuple  *memtuples;      /* array of SortTuple structs */
     218             :     int         memtupcount;    /* number of tuples currently present */
     219             :     int         memtupsize;     /* allocated length of memtuples array */
     220             :     bool        growmemtuples;  /* memtuples' growth still underway? */
     221             : 
     222             :     /*
     223             :      * Memory for tuples is sometimes allocated using a simple slab allocator,
     224             :      * rather than with palloc().  Currently, we switch to slab allocation
     225             :      * when we start merging.  Merging only needs to keep a small, fixed
     226             :      * number of tuples in memory at any time, so we can avoid the
     227             :      * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     228             :      * to hold the tuples.
     229             :      *
     230             :      * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     231             :      * slots.  The allocation is sized to have one slot per tape, plus one
     232             :      * additional slot.  We need that many slots to hold all the tuples kept
     233             :      * in the heap during merge, plus the one we have last returned from the
     234             :      * sort, with tuplesort_gettuple.
     235             :      *
     236             :      * Initially, all the slots are kept in a linked list of free slots.  When
     237             :      * a tuple is read from a tape, it is put to the next available slot, if
     238             :      * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     239             :      * instead.
     240             :      *
     241             :      * When we're done processing a tuple, we return the slot back to the free
     242             :      * list, or pfree() if it was palloc'd.  We know that a tuple was
     243             :      * allocated from the slab, if its pointer value is between
     244             :      * slabMemoryBegin and -End.
     245             :      *
     246             :      * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     247             :      * tracking memory usage is not used.
     248             :      */
     249             :     bool        slabAllocatorUsed;
     250             : 
     251             :     char       *slabMemoryBegin;    /* beginning of slab memory arena */
     252             :     char       *slabMemoryEnd;  /* end of slab memory arena */
     253             :     SlabSlot   *slabFreeHead;   /* head of free list */
     254             : 
     255             :     /* Memory used for input and output tape buffers. */
     256             :     size_t      tape_buffer_mem;
     257             : 
     258             :     /*
     259             :      * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     260             :      * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     261             :      * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     262             :      * recycle the memory on next gettuple call.
     263             :      */
     264             :     void       *lastReturnedTuple;
     265             : 
     266             :     /*
     267             :      * While building initial runs, this is the current output run number.
     268             :      * Afterwards, it is the number of initial runs we made.
     269             :      */
     270             :     int         currentRun;
     271             : 
     272             :     /*
     273             :      * Logical tapes, for merging.
     274             :      *
     275             :      * The initial runs are written in the output tapes.  In each merge pass,
     276             :      * the output tapes of the previous pass become the input tapes, and new
     277             :      * output tapes are created as needed.  When nInputTapes equals
     278             :      * nInputRuns, there is only one merge pass left.
     279             :      */
     280             :     LogicalTape **inputTapes;
     281             :     int         nInputTapes;
     282             :     int         nInputRuns;
     283             : 
     284             :     LogicalTape **outputTapes;
     285             :     int         nOutputTapes;
     286             :     int         nOutputRuns;
     287             : 
     288             :     LogicalTape *destTape;      /* current output tape */
     289             : 
     290             :     /*
     291             :      * These variables are used after completion of sorting to keep track of
     292             :      * the next tuple to return.  (In the tape case, the tape's current read
     293             :      * position is also critical state.)
     294             :      */
     295             :     LogicalTape *result_tape;   /* actual tape of finished output */
     296             :     int         current;        /* array index (only used if SORTEDINMEM) */
     297             :     bool        eof_reached;    /* reached EOF (needed for cursors) */
     298             : 
     299             :     /* markpos_xxx holds marked position for mark and restore */
     300             :     int64       markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
     301             :     int         markpos_offset; /* saved "current", or offset in tape block */
     302             :     bool        markpos_eof;    /* saved "eof_reached" */
     303             : 
     304             :     /*
     305             :      * These variables are used during parallel sorting.
     306             :      *
     307             :      * worker is our worker identifier.  Follows the general convention that
     308             :      * -1 value relates to a leader tuplesort, and values >= 0 worker
     309             :      * tuplesorts. (-1 can also be a serial tuplesort.)
     310             :      *
     311             :      * shared is mutable shared memory state, which is used to coordinate
     312             :      * parallel sorts.
     313             :      *
     314             :      * nParticipants is the number of worker Tuplesortstates known by the
     315             :      * leader to have actually been launched, which implies that they must
     316             :      * finish a run that the leader needs to merge.  Typically includes a
     317             :      * worker state held by the leader process itself.  Set in the leader
     318             :      * Tuplesortstate only.
     319             :      */
     320             :     int         worker;
     321             :     Sharedsort *shared;
     322             :     int         nParticipants;
     323             : 
     324             :     /*
     325             :      * Additional state for managing "abbreviated key" sortsupport routines
     326             :      * (which currently may be used by all cases except the hash index case).
     327             :      * Tracks the intervals at which the optimization's effectiveness is
     328             :      * tested.
     329             :      */
     330             :     int64       abbrevNext;     /* Tuple # at which to next check
     331             :                                  * applicability */
     332             : 
     333             :     /*
     334             :      * Resource snapshot for time of sort start.
     335             :      */
     336             :     PGRUsage    ru_start;
     337             : };
     338             : 
     339             : /*
     340             :  * Private mutable state of tuplesort-parallel-operation.  This is allocated
     341             :  * in shared memory.
     342             :  */
     343             : struct Sharedsort
     344             : {
     345             :     /* mutex protects all fields prior to tapes */
     346             :     slock_t     mutex;
     347             : 
     348             :     /*
     349             :      * currentWorker generates ordinal identifier numbers for parallel sort
     350             :      * workers.  These start from 0, and are always gapless.
     351             :      *
     352             :      * Workers increment workersFinished to indicate having finished.  If this
     353             :      * is equal to state.nParticipants within the leader, leader is ready to
     354             :      * merge worker runs.
     355             :      */
     356             :     int         currentWorker;
     357             :     int         workersFinished;
     358             : 
     359             :     /* Temporary file space */
     360             :     SharedFileSet fileset;
     361             : 
     362             :     /* Size of tapes flexible array */
     363             :     int         nTapes;
     364             : 
     365             :     /*
     366             :      * Tapes array used by workers to report back information needed by the
     367             :      * leader to concatenate all worker tapes into one for merging
     368             :      */
     369             :     TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
     370             : };
     371             : 
     372             : /*
     373             :  * Is the given tuple allocated from the slab memory arena?
     374             :  */
     375             : #define IS_SLAB_SLOT(state, tuple) \
     376             :     ((char *) (tuple) >= (state)->slabMemoryBegin && \
     377             :      (char *) (tuple) < (state)->slabMemoryEnd)
     378             : 
     379             : /*
     380             :  * Return the given tuple to the slab memory free list, or free it
     381             :  * if it was palloc'd.
     382             :  */
     383             : #define RELEASE_SLAB_SLOT(state, tuple) \
     384             :     do { \
     385             :         SlabSlot *buf = (SlabSlot *) tuple; \
     386             :         \
     387             :         if (IS_SLAB_SLOT((state), buf)) \
     388             :         { \
     389             :             buf->nextfree = (state)->slabFreeHead; \
     390             :             (state)->slabFreeHead = buf; \
     391             :         } else \
     392             :             pfree(buf); \
     393             :     } while(0)
     394             : 
     395             : #define REMOVEABBREV(state,stup,count)  ((*(state)->base.removeabbrev) (state, stup, count))
     396             : #define COMPARETUP(state,a,b)   ((*(state)->base.comparetup) (a, b, state))
     397             : #define WRITETUP(state,tape,stup)   ((*(state)->base.writetup) (state, tape, stup))
     398             : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
     399             : #define FREESTATE(state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
     400             : #define LACKMEM(state)      ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
     401             : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     402             : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     403             : #define SERIAL(state)       ((state)->shared == NULL)
     404             : #define WORKER(state)       ((state)->shared && (state)->worker != -1)
     405             : #define LEADER(state)       ((state)->shared && (state)->worker == -1)
     406             : 
     407             : /*
     408             :  * NOTES about on-tape representation of tuples:
     409             :  *
     410             :  * We require the first "unsigned int" of a stored tuple to be the total size
     411             :  * on-tape of the tuple, including itself (so it is never zero; an all-zero
     412             :  * unsigned int is used to delimit runs).  The remainder of the stored tuple
     413             :  * may or may not match the in-memory representation of the tuple ---
     414             :  * any conversion needed is the job of the writetup and readtup routines.
     415             :  *
     416             :  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
     417             :  * representation of the tuple must be followed by another "unsigned int" that
     418             :  * is a copy of the length --- so the total tape space used is actually
     419             :  * sizeof(unsigned int) more than the stored length value.  This allows
     420             :  * read-backwards.  When the random access flag was not specified, the
     421             :  * write/read routines may omit the extra length word.
     422             :  *
     423             :  * writetup is expected to write both length words as well as the tuple
     424             :  * data.  When readtup is called, the tape is positioned just after the
     425             :  * front length word; readtup must read the tuple data and advance past
     426             :  * the back length word (if present).
     427             :  *
     428             :  * The write/read routines can make use of the tuple description data
     429             :  * stored in the Tuplesortstate record, if needed.  They are also expected
     430             :  * to adjust state->availMem by the amount of memory space (not tape space!)
     431             :  * released or consumed.  There is no error return from either writetup
     432             :  * or readtup; they should ereport() on failure.
     433             :  *
     434             :  *
     435             :  * NOTES about memory consumption calculations:
     436             :  *
     437             :  * We count space allocated for tuples against the workMem limit, plus
     438             :  * the space used by the variable-size memtuples array.  Fixed-size space
     439             :  * is not counted; it's small enough to not be interesting.
     440             :  *
     441             :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     442             :  * rather than the originally-requested size.  This is important since
     443             :  * palloc can add substantial overhead.  It's not a complete answer since
     444             :  * we won't count any wasted space in palloc allocation blocks, but it's
     445             :  * a lot better than what we were doing before 7.3.  As of 9.6, a
     446             :  * separate memory context is used for caller passed tuples.  Resetting
     447             :  * it at certain key increments significantly ameliorates fragmentation.
     448             :  * readtup routines use the slab allocator (they cannot use
     449             :  * the reset context because it gets deleted at the point that merging
     450             :  * begins).
     451             :  */
     452             : 
     453             : 
     454             : static void tuplesort_begin_batch(Tuplesortstate *state);
     455             : static bool consider_abort_common(Tuplesortstate *state);
     456             : static void inittapes(Tuplesortstate *state, bool mergeruns);
     457             : static void inittapestate(Tuplesortstate *state, int maxTapes);
     458             : static void selectnewtape(Tuplesortstate *state);
     459             : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
     460             : static void mergeruns(Tuplesortstate *state);
     461             : static void mergeonerun(Tuplesortstate *state);
     462             : static void beginmerge(Tuplesortstate *state);
     463             : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
     464             : static void dumptuples(Tuplesortstate *state, bool alltuples);
     465             : static void make_bounded_heap(Tuplesortstate *state);
     466             : static void sort_bounded_heap(Tuplesortstate *state);
     467             : static void tuplesort_sort_memtuples(Tuplesortstate *state);
     468             : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
     469             : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
     470             : static void tuplesort_heap_delete_top(Tuplesortstate *state);
     471             : static void reversedirection(Tuplesortstate *state);
     472             : static unsigned int getlen(LogicalTape *tape, bool eofOK);
     473             : static void markrunend(LogicalTape *tape);
     474             : static int  worker_get_identifier(Tuplesortstate *state);
     475             : static void worker_freeze_result_tape(Tuplesortstate *state);
     476             : static void worker_nomergeruns(Tuplesortstate *state);
     477             : static void leader_takeover_tapes(Tuplesortstate *state);
     478             : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
     479             : static void tuplesort_free(Tuplesortstate *state);
     480             : static void tuplesort_updatemax(Tuplesortstate *state);
     481             : 
     482             : /*
     483             :  * Specialized comparators that we can inline into specialized sorts.  The goal
     484             :  * is to try to sort two tuples without having to follow the pointers to the
     485             :  * comparator or the tuple.
     486             :  *
     487             :  * XXX: For now, there is no specialization for cases where datum1 is
     488             :  * authoritative and we don't even need to fall back to a callback at all (that
     489             :  * would be true for types like int4/int8/timestamp/date, but not true for
     490             :  * abbreviations of text or multi-key sorts.  There could be!  Is it worth it?
     491             :  */
     492             : 
     493             : /* Used if first key's comparator is ssup_datum_unsigned_cmp */
     494             : static pg_attribute_always_inline int
     495    46521886 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     496             : {
     497             :     int         compare;
     498             : 
     499    46521886 :     compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
     500    46521886 :                                           b->datum1, b->isnull1,
     501             :                                           &state->base.sortKeys[0]);
     502    46521886 :     if (compare != 0)
     503    41970120 :         return compare;
     504             : 
     505             :     /*
     506             :      * No need to waste effort calling the tiebreak function when there are no
     507             :      * other keys to sort on.
     508             :      */
     509     4551766 :     if (state->base.onlyKey != NULL)
     510           0 :         return 0;
     511             : 
     512     4551766 :     return state->base.comparetup_tiebreak(a, b, state);
     513             : }
     514             : 
     515             : /* Used if first key's comparator is ssup_datum_signed_cmp */
     516             : static pg_attribute_always_inline int
     517     8995712 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     518             : {
     519             :     int         compare;
     520             : 
     521     8995712 :     compare = ApplySignedSortComparator(a->datum1, a->isnull1,
     522     8995712 :                                         b->datum1, b->isnull1,
     523             :                                         &state->base.sortKeys[0]);
     524             : 
     525     8995712 :     if (compare != 0)
     526     8954576 :         return compare;
     527             : 
     528             :     /*
     529             :      * No need to waste effort calling the tiebreak function when there are no
     530             :      * other keys to sort on.
     531             :      */
     532       41136 :     if (state->base.onlyKey != NULL)
     533       26674 :         return 0;
     534             : 
     535       14462 :     return state->base.comparetup_tiebreak(a, b, state);
     536             : }
     537             : 
     538             : /* Used if first key's comparator is ssup_datum_int32_cmp */
     539             : static pg_attribute_always_inline int
     540    55280204 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     541             : {
     542             :     int         compare;
     543             : 
     544    55280204 :     compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
     545    55280204 :                                        b->datum1, b->isnull1,
     546             :                                        &state->base.sortKeys[0]);
     547             : 
     548    55280204 :     if (compare != 0)
     549    39727124 :         return compare;
     550             : 
     551             :     /*
     552             :      * No need to waste effort calling the tiebreak function when there are no
     553             :      * other keys to sort on.
     554             :      */
     555    15553080 :     if (state->base.onlyKey != NULL)
     556     1828242 :         return 0;
     557             : 
     558    13724838 :     return state->base.comparetup_tiebreak(a, b, state);
     559             : }
     560             : 
     561             : /*
     562             :  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
     563             :  * any variant of SortTuples, using the appropriate comparetup function.
     564             :  * qsort_ssup() is specialized for the case where the comparetup function
     565             :  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
     566             :  * and Datum sorts.  qsort_tuple_{unsigned,signed,int32} are specialized for
     567             :  * common comparison functions on pass-by-value leading datums.
     568             :  */
     569             : 
     570             : #define ST_SORT qsort_tuple_unsigned
     571             : #define ST_ELEMENT_TYPE SortTuple
     572             : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
     573             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     574             : #define ST_CHECK_FOR_INTERRUPTS
     575             : #define ST_SCOPE static
     576             : #define ST_DEFINE
     577             : #include "lib/sort_template.h"
     578             : 
     579             : #define ST_SORT qsort_tuple_signed
     580             : #define ST_ELEMENT_TYPE SortTuple
     581             : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
     582             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     583             : #define ST_CHECK_FOR_INTERRUPTS
     584             : #define ST_SCOPE static
     585             : #define ST_DEFINE
     586             : #include "lib/sort_template.h"
     587             : 
     588             : #define ST_SORT qsort_tuple_int32
     589             : #define ST_ELEMENT_TYPE SortTuple
     590             : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
     591             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     592             : #define ST_CHECK_FOR_INTERRUPTS
     593             : #define ST_SCOPE static
     594             : #define ST_DEFINE
     595             : #include "lib/sort_template.h"
     596             : 
     597             : #define ST_SORT qsort_tuple
     598             : #define ST_ELEMENT_TYPE SortTuple
     599             : #define ST_COMPARE_RUNTIME_POINTER
     600             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     601             : #define ST_CHECK_FOR_INTERRUPTS
     602             : #define ST_SCOPE static
     603             : #define ST_DECLARE
     604             : #define ST_DEFINE
     605             : #include "lib/sort_template.h"
     606             : 
     607             : #define ST_SORT qsort_ssup
     608             : #define ST_ELEMENT_TYPE SortTuple
     609             : #define ST_COMPARE(a, b, ssup) \
     610             :     ApplySortComparator((a)->datum1, (a)->isnull1, \
     611             :                         (b)->datum1, (b)->isnull1, (ssup))
     612             : #define ST_COMPARE_ARG_TYPE SortSupportData
     613             : #define ST_CHECK_FOR_INTERRUPTS
     614             : #define ST_SCOPE static
     615             : #define ST_DEFINE
     616             : #include "lib/sort_template.h"
     617             : 
     618             : /*
     619             :  *      tuplesort_begin_xxx
     620             :  *
     621             :  * Initialize for a tuple sort operation.
     622             :  *
     623             :  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
     624             :  * zero or more times, then call tuplesort_performsort when all the tuples
     625             :  * have been supplied.  After performsort, retrieve the tuples in sorted
     626             :  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
     627             :  * access was requested, rescan, markpos, and restorepos can also be called.)
     628             :  * Call tuplesort_end to terminate the operation and release memory/disk space.
     629             :  *
     630             :  * Each variant of tuplesort_begin has a workMem parameter specifying the
     631             :  * maximum number of kilobytes of RAM to use before spilling data to disk.
     632             :  * (The normal value of this parameter is work_mem, but some callers use
     633             :  * other values.)  Each variant also has a sortopt which is a bitmask of
     634             :  * sort options.  See TUPLESORT_* definitions in tuplesort.h
     635             :  */
     636             : 
     637             : Tuplesortstate *
     638      272646 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
     639             : {
     640             :     Tuplesortstate *state;
     641             :     MemoryContext maincontext;
     642             :     MemoryContext sortcontext;
     643             :     MemoryContext oldcontext;
     644             : 
     645             :     /* See leader_takeover_tapes() remarks on random access support */
     646      272646 :     if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
     647           0 :         elog(ERROR, "random access disallowed under parallel sort");
     648             : 
     649             :     /*
     650             :      * Memory context surviving tuplesort_reset.  This memory context holds
     651             :      * data which is useful to keep while sorting multiple similar batches.
     652             :      */
     653      272646 :     maincontext = AllocSetContextCreate(CurrentMemoryContext,
     654             :                                         "TupleSort main",
     655             :                                         ALLOCSET_DEFAULT_SIZES);
     656             : 
     657             :     /*
     658             :      * Create a working memory context for one sort operation.  The content of
     659             :      * this context is deleted by tuplesort_reset.
     660             :      */
     661      272646 :     sortcontext = AllocSetContextCreate(maincontext,
     662             :                                         "TupleSort sort",
     663             :                                         ALLOCSET_DEFAULT_SIZES);
     664             : 
     665             :     /*
     666             :      * Additionally a working memory context for tuples is setup in
     667             :      * tuplesort_begin_batch.
     668             :      */
     669             : 
     670             :     /*
     671             :      * Make the Tuplesortstate within the per-sortstate context.  This way, we
     672             :      * don't need a separate pfree() operation for it at shutdown.
     673             :      */
     674      272646 :     oldcontext = MemoryContextSwitchTo(maincontext);
     675             : 
     676      272646 :     state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
     677             : 
     678      272646 :     if (trace_sort)
     679           0 :         pg_rusage_init(&state->ru_start);
     680             : 
     681      272646 :     state->base.sortopt = sortopt;
     682      272646 :     state->base.tuples = true;
     683      272646 :     state->abbrevNext = 10;
     684             : 
     685             :     /*
     686             :      * workMem is forced to be at least 64KB, the current minimum valid value
     687             :      * for the work_mem GUC.  This is a defense against parallel sort callers
     688             :      * that divide out memory among many workers in a way that leaves each
     689             :      * with very little memory.
     690             :      */
     691      272646 :     state->allowedMem = Max(workMem, 64) * (int64) 1024;
     692      272646 :     state->base.sortcontext = sortcontext;
     693      272646 :     state->base.maincontext = maincontext;
     694             : 
     695             :     /*
     696             :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     697             :      * see comments in grow_memtuples().
     698             :      */
     699      272646 :     state->memtupsize = INITIAL_MEMTUPSIZE;
     700      272646 :     state->memtuples = NULL;
     701             : 
     702             :     /*
     703             :      * After all of the other non-parallel-related state, we setup all of the
     704             :      * state needed for each batch.
     705             :      */
     706      272646 :     tuplesort_begin_batch(state);
     707             : 
     708             :     /*
     709             :      * Initialize parallel-related state based on coordination information
     710             :      * from caller
     711             :      */
     712      272646 :     if (!coordinate)
     713             :     {
     714             :         /* Serial sort */
     715      271982 :         state->shared = NULL;
     716      271982 :         state->worker = -1;
     717      271982 :         state->nParticipants = -1;
     718             :     }
     719         664 :     else if (coordinate->isWorker)
     720             :     {
     721             :         /* Parallel worker produces exactly one final run from all input */
     722         444 :         state->shared = coordinate->sharedsort;
     723         444 :         state->worker = worker_get_identifier(state);
     724         444 :         state->nParticipants = -1;
     725             :     }
     726             :     else
     727             :     {
     728             :         /* Parallel leader state only used for final merge */
     729         220 :         state->shared = coordinate->sharedsort;
     730         220 :         state->worker = -1;
     731         220 :         state->nParticipants = coordinate->nParticipants;
     732             :         Assert(state->nParticipants >= 1);
     733             :     }
     734             : 
     735      272646 :     MemoryContextSwitchTo(oldcontext);
     736             : 
     737      272646 :     return state;
     738             : }
     739             : 
     740             : /*
     741             :  *      tuplesort_begin_batch
     742             :  *
     743             :  * Setup, or reset, all state need for processing a new set of tuples with this
     744             :  * sort state. Called both from tuplesort_begin_common (the first time sorting
     745             :  * with this sort state) and tuplesort_reset (for subsequent usages).
     746             :  */
     747             : static void
     748      275692 : tuplesort_begin_batch(Tuplesortstate *state)
     749             : {
     750             :     MemoryContext oldcontext;
     751             : 
     752      275692 :     oldcontext = MemoryContextSwitchTo(state->base.maincontext);
     753             : 
     754             :     /*
     755             :      * Caller tuple (e.g. IndexTuple) memory context.
     756             :      *
     757             :      * A dedicated child context used exclusively for caller passed tuples
     758             :      * eases memory management.  Resetting at key points reduces
     759             :      * fragmentation. Note that the memtuples array of SortTuples is allocated
     760             :      * in the parent context, not this context, because there is no need to
     761             :      * free memtuples early.  For bounded sorts, tuples may be pfreed in any
     762             :      * order, so we use a regular aset.c context so that it can make use of
     763             :      * free'd memory.  When the sort is not bounded, we make use of a bump.c
     764             :      * context as this keeps allocations more compact with less wastage.
     765             :      * Allocations are also slightly more CPU efficient.
     766             :      */
     767      275692 :     if (TupleSortUseBumpTupleCxt(state->base.sortopt))
     768      274322 :         state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
     769             :                                                      "Caller tuples",
     770             :                                                      ALLOCSET_DEFAULT_SIZES);
     771             :     else
     772        1370 :         state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
     773             :                                                          "Caller tuples",
     774             :                                                          ALLOCSET_DEFAULT_SIZES);
     775             : 
     776             : 
     777      275692 :     state->status = TSS_INITIAL;
     778      275692 :     state->bounded = false;
     779      275692 :     state->boundUsed = false;
     780             : 
     781      275692 :     state->availMem = state->allowedMem;
     782             : 
     783      275692 :     state->tapeset = NULL;
     784             : 
     785      275692 :     state->memtupcount = 0;
     786             : 
     787             :     /*
     788             :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     789             :      * see comments in grow_memtuples().
     790             :      */
     791      275692 :     state->growmemtuples = true;
     792      275692 :     state->slabAllocatorUsed = false;
     793      275692 :     if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
     794             :     {
     795           0 :         pfree(state->memtuples);
     796           0 :         state->memtuples = NULL;
     797           0 :         state->memtupsize = INITIAL_MEMTUPSIZE;
     798             :     }
     799      275692 :     if (state->memtuples == NULL)
     800             :     {
     801      272646 :         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
     802      272646 :         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     803             :     }
     804             : 
     805             :     /* workMem must be large enough for the minimal memtuples array */
     806      275692 :     if (LACKMEM(state))
     807           0 :         elog(ERROR, "insufficient memory allowed for sort");
     808             : 
     809      275692 :     state->currentRun = 0;
     810             : 
     811             :     /*
     812             :      * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
     813             :      * inittapes(), if needed.
     814             :      */
     815             : 
     816      275692 :     state->result_tape = NULL;   /* flag that result tape has not been formed */
     817             : 
     818      275692 :     MemoryContextSwitchTo(oldcontext);
     819      275692 : }
     820             : 
     821             : /*
     822             :  * tuplesort_set_bound
     823             :  *
     824             :  *  Advise tuplesort that at most the first N result tuples are required.
     825             :  *
     826             :  * Must be called before inserting any tuples.  (Actually, we could allow it
     827             :  * as long as the sort hasn't spilled to disk, but there seems no need for
     828             :  * delayed calls at the moment.)
     829             :  *
     830             :  * This is a hint only. The tuplesort may still return more tuples than
     831             :  * requested.  Parallel leader tuplesorts will always ignore the hint.
     832             :  */
     833             : void
     834        1236 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
     835             : {
     836             :     /* Assert we're called before loading any tuples */
     837             :     Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
     838             :     /* Assert we allow bounded sorts */
     839             :     Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
     840             :     /* Can't set the bound twice, either */
     841             :     Assert(!state->bounded);
     842             :     /* Also, this shouldn't be called in a parallel worker */
     843             :     Assert(!WORKER(state));
     844             : 
     845             :     /* Parallel leader allows but ignores hint */
     846        1236 :     if (LEADER(state))
     847           0 :         return;
     848             : 
     849             : #ifdef DEBUG_BOUNDED_SORT
     850             :     /* Honor GUC setting that disables the feature (for easy testing) */
     851             :     if (!optimize_bounded_sort)
     852             :         return;
     853             : #endif
     854             : 
     855             :     /* We want to be able to compute bound * 2, so limit the setting */
     856        1236 :     if (bound > (int64) (INT_MAX / 2))
     857           0 :         return;
     858             : 
     859        1236 :     state->bounded = true;
     860        1236 :     state->bound = (int) bound;
     861             : 
     862             :     /*
     863             :      * Bounded sorts are not an effective target for abbreviated key
     864             :      * optimization.  Disable by setting state to be consistent with no
     865             :      * abbreviation support.
     866             :      */
     867        1236 :     state->base.sortKeys->abbrev_converter = NULL;
     868        1236 :     if (state->base.sortKeys->abbrev_full_comparator)
     869          16 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
     870             : 
     871             :     /* Not strictly necessary, but be tidy */
     872        1236 :     state->base.sortKeys->abbrev_abort = NULL;
     873        1236 :     state->base.sortKeys->abbrev_full_comparator = NULL;
     874             : }
     875             : 
     876             : /*
     877             :  * tuplesort_used_bound
     878             :  *
     879             :  * Allow callers to find out if the sort state was able to use a bound.
     880             :  */
     881             : bool
     882         290 : tuplesort_used_bound(Tuplesortstate *state)
     883             : {
     884         290 :     return state->boundUsed;
     885             : }
     886             : 
     887             : /*
     888             :  * tuplesort_free
     889             :  *
     890             :  *  Internal routine for freeing resources of tuplesort.
     891             :  */
     892             : static void
     893      275430 : tuplesort_free(Tuplesortstate *state)
     894             : {
     895             :     /* context swap probably not needed, but let's be safe */
     896      275430 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
     897             :     int64       spaceUsed;
     898             : 
     899      275430 :     if (state->tapeset)
     900         722 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset);
     901             :     else
     902      274708 :         spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
     903             : 
     904             :     /*
     905             :      * Delete temporary "tape" files, if any.
     906             :      *
     907             :      * We don't bother to destroy the individual tapes here. They will go away
     908             :      * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
     909             :      * finished tapes already.)
     910             :      */
     911      275430 :     if (state->tapeset)
     912         722 :         LogicalTapeSetClose(state->tapeset);
     913             : 
     914      275430 :     if (trace_sort)
     915             :     {
     916           0 :         if (state->tapeset)
     917           0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
     918             :                  SERIAL(state) ? "external sort" : "parallel external sort",
     919             :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     920             :         else
     921           0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
     922             :                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
     923             :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     924             :     }
     925             : 
     926             :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
     927             : 
     928      275430 :     FREESTATE(state);
     929      275430 :     MemoryContextSwitchTo(oldcontext);
     930             : 
     931             :     /*
     932             :      * Free the per-sort memory context, thereby releasing all working memory.
     933             :      */
     934      275430 :     MemoryContextReset(state->base.sortcontext);
     935      275430 : }
     936             : 
     937             : /*
     938             :  * tuplesort_end
     939             :  *
     940             :  *  Release resources and clean up.
     941             :  *
     942             :  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
     943             :  * pointing to garbage.  Be careful not to attempt to use or free such
     944             :  * pointers afterwards!
     945             :  */
     946             : void
     947      272384 : tuplesort_end(Tuplesortstate *state)
     948             : {
     949      272384 :     tuplesort_free(state);
     950             : 
     951             :     /*
     952             :      * Free the main memory context, including the Tuplesortstate struct
     953             :      * itself.
     954             :      */
     955      272384 :     MemoryContextDelete(state->base.maincontext);
     956      272384 : }
     957             : 
     958             : /*
     959             :  * tuplesort_updatemax
     960             :  *
     961             :  *  Update maximum resource usage statistics.
     962             :  */
     963             : static void
     964        3442 : tuplesort_updatemax(Tuplesortstate *state)
     965             : {
     966             :     int64       spaceUsed;
     967             :     bool        isSpaceDisk;
     968             : 
     969             :     /*
     970             :      * Note: it might seem we should provide both memory and disk usage for a
     971             :      * disk-based sort.  However, the current code doesn't track memory space
     972             :      * accurately once we have begun to return tuples to the caller (since we
     973             :      * don't account for pfree's the caller is expected to do), so we cannot
     974             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
     975             :      * to fix.  Is it worth creating an API for the memory context code to
     976             :      * tell us how much is actually used in sortcontext?
     977             :      */
     978        3442 :     if (state->tapeset)
     979             :     {
     980           6 :         isSpaceDisk = true;
     981           6 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
     982             :     }
     983             :     else
     984             :     {
     985        3436 :         isSpaceDisk = false;
     986        3436 :         spaceUsed = state->allowedMem - state->availMem;
     987             :     }
     988             : 
     989             :     /*
     990             :      * Sort evicts data to the disk when it wasn't able to fit that data into
     991             :      * main memory.  This is why we assume space used on the disk to be more
     992             :      * important for tracking resource usage than space used in memory. Note
     993             :      * that the amount of space occupied by some tupleset on the disk might be
     994             :      * less than amount of space occupied by the same tupleset in memory due
     995             :      * to more compact representation.
     996             :      */
     997        3442 :     if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
     998        3436 :         (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
     999             :     {
    1000         460 :         state->maxSpace = spaceUsed;
    1001         460 :         state->isMaxSpaceDisk = isSpaceDisk;
    1002         460 :         state->maxSpaceStatus = state->status;
    1003             :     }
    1004        3442 : }
    1005             : 
    1006             : /*
    1007             :  * tuplesort_reset
    1008             :  *
    1009             :  *  Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
    1010             :  *  meta-information in.  After tuplesort_reset, tuplesort is ready to start
    1011             :  *  a new sort.  This allows avoiding recreation of tuple sort states (and
    1012             :  *  save resources) when sorting multiple small batches.
    1013             :  */
    1014             : void
    1015        3046 : tuplesort_reset(Tuplesortstate *state)
    1016             : {
    1017        3046 :     tuplesort_updatemax(state);
    1018        3046 :     tuplesort_free(state);
    1019             : 
    1020             :     /*
    1021             :      * After we've freed up per-batch memory, re-setup all of the state common
    1022             :      * to both the first batch and any subsequent batch.
    1023             :      */
    1024        3046 :     tuplesort_begin_batch(state);
    1025             : 
    1026        3046 :     state->lastReturnedTuple = NULL;
    1027        3046 :     state->slabMemoryBegin = NULL;
    1028        3046 :     state->slabMemoryEnd = NULL;
    1029        3046 :     state->slabFreeHead = NULL;
    1030        3046 : }
    1031             : 
    1032             : /*
    1033             :  * Grow the memtuples[] array, if possible within our memory constraint.  We
    1034             :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
    1035             :  * limit.  Return true if we were able to enlarge the array, false if not.
    1036             :  *
    1037             :  * Normally, at each increment we double the size of the array.  When doing
    1038             :  * that would exceed a limit, we attempt one last, smaller increase (and then
    1039             :  * clear the growmemtuples flag so we don't try any more).  That allows us to
    1040             :  * use memory as fully as permitted; sticking to the pure doubling rule could
    1041             :  * result in almost half going unused.  Because availMem moves around with
    1042             :  * tuple addition/removal, we need some rule to prevent making repeated small
    1043             :  * increases in memtupsize, which would just be useless thrashing.  The
    1044             :  * growmemtuples flag accomplishes that and also prevents useless
    1045             :  * recalculations in this function.
    1046             :  */
    1047             : static bool
    1048        7976 : grow_memtuples(Tuplesortstate *state)
    1049             : {
    1050             :     int         newmemtupsize;
    1051        7976 :     int         memtupsize = state->memtupsize;
    1052        7976 :     int64       memNowUsed = state->allowedMem - state->availMem;
    1053             : 
    1054             :     /* Forget it if we've already maxed out memtuples, per comment above */
    1055        7976 :     if (!state->growmemtuples)
    1056         122 :         return false;
    1057             : 
    1058             :     /* Select new value of memtupsize */
    1059        7854 :     if (memNowUsed <= state->availMem)
    1060             :     {
    1061             :         /*
    1062             :          * We've used no more than half of allowedMem; double our usage,
    1063             :          * clamping at INT_MAX tuples.
    1064             :          */
    1065        7724 :         if (memtupsize < INT_MAX / 2)
    1066        7724 :             newmemtupsize = memtupsize * 2;
    1067             :         else
    1068             :         {
    1069           0 :             newmemtupsize = INT_MAX;
    1070           0 :             state->growmemtuples = false;
    1071             :         }
    1072             :     }
    1073             :     else
    1074             :     {
    1075             :         /*
    1076             :          * This will be the last increment of memtupsize.  Abandon doubling
    1077             :          * strategy and instead increase as much as we safely can.
    1078             :          *
    1079             :          * To stay within allowedMem, we can't increase memtupsize by more
    1080             :          * than availMem / sizeof(SortTuple) elements.  In practice, we want
    1081             :          * to increase it by considerably less, because we need to leave some
    1082             :          * space for the tuples to which the new array slots will refer.  We
    1083             :          * assume the new tuples will be about the same size as the tuples
    1084             :          * we've already seen, and thus we can extrapolate from the space
    1085             :          * consumption so far to estimate an appropriate new size for the
    1086             :          * memtuples array.  The optimal value might be higher or lower than
    1087             :          * this estimate, but it's hard to know that in advance.  We again
    1088             :          * clamp at INT_MAX tuples.
    1089             :          *
    1090             :          * This calculation is safe against enlarging the array so much that
    1091             :          * LACKMEM becomes true, because the memory currently used includes
    1092             :          * the present array; thus, there would be enough allowedMem for the
    1093             :          * new array elements even if no other memory were currently used.
    1094             :          *
    1095             :          * We do the arithmetic in float8, because otherwise the product of
    1096             :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
    1097             :          * result should be insignificant; but even if we computed a
    1098             :          * completely insane result, the checks below will prevent anything
    1099             :          * really bad from happening.
    1100             :          */
    1101             :         double      grow_ratio;
    1102             : 
    1103         130 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
    1104         130 :         if (memtupsize * grow_ratio < INT_MAX)
    1105         130 :             newmemtupsize = (int) (memtupsize * grow_ratio);
    1106             :         else
    1107           0 :             newmemtupsize = INT_MAX;
    1108             : 
    1109             :         /* We won't make any further enlargement attempts */
    1110         130 :         state->growmemtuples = false;
    1111             :     }
    1112             : 
    1113             :     /* Must enlarge array by at least one element, else report failure */
    1114        7854 :     if (newmemtupsize <= memtupsize)
    1115           0 :         goto noalloc;
    1116             : 
    1117             :     /*
    1118             :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
    1119             :      * to ensure our request won't be rejected.  Note that we can easily
    1120             :      * exhaust address space before facing this outcome.  (This is presently
    1121             :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
    1122             :      * don't rely on that at this distance.)
    1123             :      */
    1124        7854 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    1125             :     {
    1126           0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
    1127           0 :         state->growmemtuples = false;    /* can't grow any more */
    1128             :     }
    1129             : 
    1130             :     /*
    1131             :      * We need to be sure that we do not cause LACKMEM to become true, else
    1132             :      * the space management algorithm will go nuts.  The code above should
    1133             :      * never generate a dangerous request, but to be safe, check explicitly
    1134             :      * that the array growth fits within availMem.  (We could still cause
    1135             :      * LACKMEM if the memory chunk overhead associated with the memtuples
    1136             :      * array were to increase.  That shouldn't happen because we chose the
    1137             :      * initial array size large enough to ensure that palloc will be treating
    1138             :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
    1139             :      * explicitly below just in case.)
    1140             :      */
    1141        7854 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
    1142           0 :         goto noalloc;
    1143             : 
    1144             :     /* OK, do it */
    1145        7854 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1146        7854 :     state->memtupsize = newmemtupsize;
    1147        7854 :     state->memtuples = (SortTuple *)
    1148        7854 :         repalloc_huge(state->memtuples,
    1149        7854 :                       state->memtupsize * sizeof(SortTuple));
    1150        7854 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1151        7854 :     if (LACKMEM(state))
    1152           0 :         elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    1153        7854 :     return true;
    1154             : 
    1155           0 : noalloc:
    1156             :     /* If for any reason we didn't realloc, shut off future attempts */
    1157           0 :     state->growmemtuples = false;
    1158           0 :     return false;
    1159             : }
    1160             : 
    1161             : /*
    1162             :  * Shared code for tuple and datum cases.
    1163             :  */
    1164             : void
    1165    30211304 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
    1166             :                           bool useAbbrev, Size tuplen)
    1167             : {
    1168    30211304 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1169             : 
    1170             :     Assert(!LEADER(state));
    1171             : 
    1172             :     /* account for the memory used for this tuple */
    1173    30211304 :     USEMEM(state, tuplen);
    1174    30211304 :     state->tupleMem += tuplen;
    1175             : 
    1176    30211304 :     if (!useAbbrev)
    1177             :     {
    1178             :         /*
    1179             :          * Leave ordinary Datum representation, or NULL value.  If there is a
    1180             :          * converter it won't expect NULL values, and cost model is not
    1181             :          * required to account for NULL, so in that case we avoid calling
    1182             :          * converter and just set datum1 to zeroed representation (to be
    1183             :          * consistent, and to support cheap inequality tests for NULL
    1184             :          * abbreviated keys).
    1185             :          */
    1186             :     }
    1187     4434304 :     else if (!consider_abort_common(state))
    1188             :     {
    1189             :         /* Store abbreviated key representation */
    1190     4434208 :         tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
    1191             :                                                                state->base.sortKeys);
    1192             :     }
    1193             :     else
    1194             :     {
    1195             :         /*
    1196             :          * Set state to be consistent with never trying abbreviation.
    1197             :          *
    1198             :          * Alter datum1 representation in already-copied tuples, so as to
    1199             :          * ensure a consistent representation (current tuple was just
    1200             :          * handled).  It does not matter if some dumped tuples are already
    1201             :          * sorted on tape, since serialized tuples lack abbreviated keys
    1202             :          * (TSS_BUILDRUNS state prevents control reaching here in any case).
    1203             :          */
    1204          96 :         REMOVEABBREV(state, state->memtuples, state->memtupcount);
    1205             :     }
    1206             : 
    1207    30211304 :     switch (state->status)
    1208             :     {
    1209    25392818 :         case TSS_INITIAL:
    1210             : 
    1211             :             /*
    1212             :              * Save the tuple into the unsorted array.  First, grow the array
    1213             :              * as needed.  Note that we try to grow the array when there is
    1214             :              * still one free slot remaining --- if we fail, there'll still be
    1215             :              * room to store the incoming tuple, and then we'll switch to
    1216             :              * tape-based operation.
    1217             :              */
    1218    25392818 :             if (state->memtupcount >= state->memtupsize - 1)
    1219             :             {
    1220        7976 :                 (void) grow_memtuples(state);
    1221             :                 Assert(state->memtupcount < state->memtupsize);
    1222             :             }
    1223    25392818 :             state->memtuples[state->memtupcount++] = *tuple;
    1224             : 
    1225             :             /*
    1226             :              * Check if it's time to switch over to a bounded heapsort. We do
    1227             :              * so if the input tuple count exceeds twice the desired tuple
    1228             :              * count (this is a heuristic for where heapsort becomes cheaper
    1229             :              * than a quicksort), or if we've just filled workMem and have
    1230             :              * enough tuples to meet the bound.
    1231             :              *
    1232             :              * Note that once we enter TSS_BOUNDED state we will always try to
    1233             :              * complete the sort that way.  In the worst case, if later input
    1234             :              * tuples are larger than earlier ones, this might cause us to
    1235             :              * exceed workMem significantly.
    1236             :              */
    1237    25392818 :             if (state->bounded &&
    1238       51318 :                 (state->memtupcount > state->bound * 2 ||
    1239       50910 :                  (state->memtupcount > state->bound && LACKMEM(state))))
    1240             :             {
    1241         408 :                 if (trace_sort)
    1242           0 :                     elog(LOG, "switching to bounded heapsort at %d tuples: %s",
    1243             :                          state->memtupcount,
    1244             :                          pg_rusage_show(&state->ru_start));
    1245         408 :                 make_bounded_heap(state);
    1246         408 :                 MemoryContextSwitchTo(oldcontext);
    1247         408 :                 return;
    1248             :             }
    1249             : 
    1250             :             /*
    1251             :              * Done if we still fit in available memory and have array slots.
    1252             :              */
    1253    25392410 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
    1254             :             {
    1255    25392288 :                 MemoryContextSwitchTo(oldcontext);
    1256    25392288 :                 return;
    1257             :             }
    1258             : 
    1259             :             /*
    1260             :              * Nope; time to switch to tape-based operation.
    1261             :              */
    1262         122 :             inittapes(state, true);
    1263             : 
    1264             :             /*
    1265             :              * Dump all tuples.
    1266             :              */
    1267         122 :             dumptuples(state, false);
    1268         122 :             break;
    1269             : 
    1270     3750180 :         case TSS_BOUNDED:
    1271             : 
    1272             :             /*
    1273             :              * We don't want to grow the array here, so check whether the new
    1274             :              * tuple can be discarded before putting it in.  This should be a
    1275             :              * good speed optimization, too, since when there are many more
    1276             :              * input tuples than the bound, most input tuples can be discarded
    1277             :              * with just this one comparison.  Note that because we currently
    1278             :              * have the sort direction reversed, we must check for <= not >=.
    1279             :              */
    1280     3750180 :             if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
    1281             :             {
    1282             :                 /* new tuple <= top of the heap, so we can discard it */
    1283     3247202 :                 free_sort_tuple(state, tuple);
    1284     3247202 :                 CHECK_FOR_INTERRUPTS();
    1285             :             }
    1286             :             else
    1287             :             {
    1288             :                 /* discard top of heap, replacing it with the new tuple */
    1289      502978 :                 free_sort_tuple(state, &state->memtuples[0]);
    1290      502978 :                 tuplesort_heap_replace_top(state, tuple);
    1291             :             }
    1292     3750180 :             break;
    1293             : 
    1294     1068306 :         case TSS_BUILDRUNS:
    1295             : 
    1296             :             /*
    1297             :              * Save the tuple into the unsorted array (there must be space)
    1298             :              */
    1299     1068306 :             state->memtuples[state->memtupcount++] = *tuple;
    1300             : 
    1301             :             /*
    1302             :              * If we are over the memory limit, dump all tuples.
    1303             :              */
    1304     1068306 :             dumptuples(state, false);
    1305     1068306 :             break;
    1306             : 
    1307           0 :         default:
    1308           0 :             elog(ERROR, "invalid tuplesort state");
    1309             :             break;
    1310             :     }
    1311     4818608 :     MemoryContextSwitchTo(oldcontext);
    1312             : }
    1313             : 
    1314             : static bool
    1315     4434304 : consider_abort_common(Tuplesortstate *state)
    1316             : {
    1317             :     Assert(state->base.sortKeys[0].abbrev_converter != NULL);
    1318             :     Assert(state->base.sortKeys[0].abbrev_abort != NULL);
    1319             :     Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
    1320             : 
    1321             :     /*
    1322             :      * Check effectiveness of abbreviation optimization.  Consider aborting
    1323             :      * when still within memory limit.
    1324             :      */
    1325     4434304 :     if (state->status == TSS_INITIAL &&
    1326     3961874 :         state->memtupcount >= state->abbrevNext)
    1327             :     {
    1328        5020 :         state->abbrevNext *= 2;
    1329             : 
    1330             :         /*
    1331             :          * Check opclass-supplied abbreviation abort routine.  It may indicate
    1332             :          * that abbreviation should not proceed.
    1333             :          */
    1334        5020 :         if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
    1335             :                                                 state->base.sortKeys))
    1336        4924 :             return false;
    1337             : 
    1338             :         /*
    1339             :          * Finally, restore authoritative comparator, and indicate that
    1340             :          * abbreviation is not in play by setting abbrev_converter to NULL
    1341             :          */
    1342          96 :         state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
    1343          96 :         state->base.sortKeys[0].abbrev_converter = NULL;
    1344             :         /* Not strictly necessary, but be tidy */
    1345          96 :         state->base.sortKeys[0].abbrev_abort = NULL;
    1346          96 :         state->base.sortKeys[0].abbrev_full_comparator = NULL;
    1347             : 
    1348             :         /* Give up - expect original pass-by-value representation */
    1349          96 :         return true;
    1350             :     }
    1351             : 
    1352     4429284 :     return false;
    1353             : }
    1354             : 
    1355             : /*
    1356             :  * All tuples have been provided; finish the sort.
    1357             :  */
    1358             : void
    1359      234106 : tuplesort_performsort(Tuplesortstate *state)
    1360             : {
    1361      234106 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1362             : 
    1363      234106 :     if (trace_sort)
    1364           0 :         elog(LOG, "performsort of worker %d starting: %s",
    1365             :              state->worker, pg_rusage_show(&state->ru_start));
    1366             : 
    1367      234106 :     switch (state->status)
    1368             :     {
    1369      233576 :         case TSS_INITIAL:
    1370             : 
    1371             :             /*
    1372             :              * We were able to accumulate all the tuples within the allowed
    1373             :              * amount of memory, or leader to take over worker tapes
    1374             :              */
    1375      233576 :             if (SERIAL(state))
    1376             :             {
    1377             :                 /* Just qsort 'em and we're done */
    1378      232976 :                 tuplesort_sort_memtuples(state);
    1379      232886 :                 state->status = TSS_SORTEDINMEM;
    1380             :             }
    1381         600 :             else if (WORKER(state))
    1382             :             {
    1383             :                 /*
    1384             :                  * Parallel workers must still dump out tuples to tape.  No
    1385             :                  * merge is required to produce single output run, though.
    1386             :                  */
    1387         444 :                 inittapes(state, false);
    1388         444 :                 dumptuples(state, true);
    1389         444 :                 worker_nomergeruns(state);
    1390         444 :                 state->status = TSS_SORTEDONTAPE;
    1391             :             }
    1392             :             else
    1393             :             {
    1394             :                 /*
    1395             :                  * Leader will take over worker tapes and merge worker runs.
    1396             :                  * Note that mergeruns sets the correct state->status.
    1397             :                  */
    1398         156 :                 leader_takeover_tapes(state);
    1399         156 :                 mergeruns(state);
    1400             :             }
    1401      233486 :             state->current = 0;
    1402      233486 :             state->eof_reached = false;
    1403      233486 :             state->markpos_block = 0L;
    1404      233486 :             state->markpos_offset = 0;
    1405      233486 :             state->markpos_eof = false;
    1406      233486 :             break;
    1407             : 
    1408         408 :         case TSS_BOUNDED:
    1409             : 
    1410             :             /*
    1411             :              * We were able to accumulate all the tuples required for output
    1412             :              * in memory, using a heap to eliminate excess tuples.  Now we
    1413             :              * have to transform the heap to a properly-sorted array. Note
    1414             :              * that sort_bounded_heap sets the correct state->status.
    1415             :              */
    1416         408 :             sort_bounded_heap(state);
    1417         408 :             state->current = 0;
    1418         408 :             state->eof_reached = false;
    1419         408 :             state->markpos_offset = 0;
    1420         408 :             state->markpos_eof = false;
    1421         408 :             break;
    1422             : 
    1423         122 :         case TSS_BUILDRUNS:
    1424             : 
    1425             :             /*
    1426             :              * Finish tape-based sort.  First, flush all tuples remaining in
    1427             :              * memory out to tape; then merge until we have a single remaining
    1428             :              * run (or, if !randomAccess and !WORKER(), one run per tape).
    1429             :              * Note that mergeruns sets the correct state->status.
    1430             :              */
    1431         122 :             dumptuples(state, true);
    1432         122 :             mergeruns(state);
    1433         122 :             state->eof_reached = false;
    1434         122 :             state->markpos_block = 0L;
    1435         122 :             state->markpos_offset = 0;
    1436         122 :             state->markpos_eof = false;
    1437         122 :             break;
    1438             : 
    1439           0 :         default:
    1440           0 :             elog(ERROR, "invalid tuplesort state");
    1441             :             break;
    1442             :     }
    1443             : 
    1444      234016 :     if (trace_sort)
    1445             :     {
    1446           0 :         if (state->status == TSS_FINALMERGE)
    1447           0 :             elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
    1448             :                  state->worker, state->nInputTapes,
    1449             :                  pg_rusage_show(&state->ru_start));
    1450             :         else
    1451           0 :             elog(LOG, "performsort of worker %d done: %s",
    1452             :                  state->worker, pg_rusage_show(&state->ru_start));
    1453             :     }
    1454             : 
    1455      234016 :     MemoryContextSwitchTo(oldcontext);
    1456      234016 : }
    1457             : 
    1458             : /*
    1459             :  * Internal routine to fetch the next tuple in either forward or back
    1460             :  * direction into *stup.  Returns false if no more tuples.
    1461             :  * Returned tuple belongs to tuplesort memory context, and must not be freed
    1462             :  * by caller.  Note that fetched tuple is stored in memory that may be
    1463             :  * recycled by any future fetch.
    1464             :  */
    1465             : bool
    1466    27443524 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    1467             :                           SortTuple *stup)
    1468             : {
    1469             :     unsigned int tuplen;
    1470             :     size_t      nmoved;
    1471             : 
    1472             :     Assert(!WORKER(state));
    1473             : 
    1474    27443524 :     switch (state->status)
    1475             :     {
    1476    22595944 :         case TSS_SORTEDINMEM:
    1477             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1478             :             Assert(!state->slabAllocatorUsed);
    1479    22595944 :             if (forward)
    1480             :             {
    1481    22595878 :                 if (state->current < state->memtupcount)
    1482             :                 {
    1483    22364826 :                     *stup = state->memtuples[state->current++];
    1484    22364826 :                     return true;
    1485             :                 }
    1486      231052 :                 state->eof_reached = true;
    1487             : 
    1488             :                 /*
    1489             :                  * Complain if caller tries to retrieve more tuples than
    1490             :                  * originally asked for in a bounded sort.  This is because
    1491             :                  * returning EOF here might be the wrong thing.
    1492             :                  */
    1493      231052 :                 if (state->bounded && state->current >= state->bound)
    1494           0 :                     elog(ERROR, "retrieved too many tuples in a bounded sort");
    1495             : 
    1496      231052 :                 return false;
    1497             :             }
    1498             :             else
    1499             :             {
    1500          66 :                 if (state->current <= 0)
    1501           0 :                     return false;
    1502             : 
    1503             :                 /*
    1504             :                  * if all tuples are fetched already then we return last
    1505             :                  * tuple, else - tuple before last returned.
    1506             :                  */
    1507          66 :                 if (state->eof_reached)
    1508          12 :                     state->eof_reached = false;
    1509             :                 else
    1510             :                 {
    1511          54 :                     state->current--;    /* last returned tuple */
    1512          54 :                     if (state->current <= 0)
    1513           6 :                         return false;
    1514             :                 }
    1515          60 :                 *stup = state->memtuples[state->current - 1];
    1516          60 :                 return true;
    1517             :             }
    1518             :             break;
    1519             : 
    1520      272994 :         case TSS_SORTEDONTAPE:
    1521             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1522             :             Assert(state->slabAllocatorUsed);
    1523             : 
    1524             :             /*
    1525             :              * The slot that held the tuple that we returned in previous
    1526             :              * gettuple call can now be reused.
    1527             :              */
    1528      272994 :             if (state->lastReturnedTuple)
    1529             :             {
    1530      152850 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1531      152850 :                 state->lastReturnedTuple = NULL;
    1532             :             }
    1533             : 
    1534      272994 :             if (forward)
    1535             :             {
    1536      272964 :                 if (state->eof_reached)
    1537           0 :                     return false;
    1538             : 
    1539      272964 :                 if ((tuplen = getlen(state->result_tape, true)) != 0)
    1540             :                 {
    1541      272940 :                     READTUP(state, stup, state->result_tape, tuplen);
    1542             : 
    1543             :                     /*
    1544             :                      * Remember the tuple we return, so that we can recycle
    1545             :                      * its memory on next call.  (This can be NULL, in the
    1546             :                      * !state->tuples case).
    1547             :                      */
    1548      272940 :                     state->lastReturnedTuple = stup->tuple;
    1549             : 
    1550      272940 :                     return true;
    1551             :                 }
    1552             :                 else
    1553             :                 {
    1554          24 :                     state->eof_reached = true;
    1555          24 :                     return false;
    1556             :                 }
    1557             :             }
    1558             : 
    1559             :             /*
    1560             :              * Backward.
    1561             :              *
    1562             :              * if all tuples are fetched already then we return last tuple,
    1563             :              * else - tuple before last returned.
    1564             :              */
    1565          30 :             if (state->eof_reached)
    1566             :             {
    1567             :                 /*
    1568             :                  * Seek position is pointing just past the zero tuplen at the
    1569             :                  * end of file; back up to fetch last tuple's ending length
    1570             :                  * word.  If seek fails we must have a completely empty file.
    1571             :                  */
    1572          12 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1573             :                                               2 * sizeof(unsigned int));
    1574          12 :                 if (nmoved == 0)
    1575           0 :                     return false;
    1576          12 :                 else if (nmoved != 2 * sizeof(unsigned int))
    1577           0 :                     elog(ERROR, "unexpected tape position");
    1578          12 :                 state->eof_reached = false;
    1579             :             }
    1580             :             else
    1581             :             {
    1582             :                 /*
    1583             :                  * Back up and fetch previously-returned tuple's ending length
    1584             :                  * word.  If seek fails, assume we are at start of file.
    1585             :                  */
    1586          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1587             :                                               sizeof(unsigned int));
    1588          18 :                 if (nmoved == 0)
    1589           0 :                     return false;
    1590          18 :                 else if (nmoved != sizeof(unsigned int))
    1591           0 :                     elog(ERROR, "unexpected tape position");
    1592          18 :                 tuplen = getlen(state->result_tape, false);
    1593             : 
    1594             :                 /*
    1595             :                  * Back up to get ending length word of tuple before it.
    1596             :                  */
    1597          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1598             :                                               tuplen + 2 * sizeof(unsigned int));
    1599          18 :                 if (nmoved == tuplen + sizeof(unsigned int))
    1600             :                 {
    1601             :                     /*
    1602             :                      * We backed up over the previous tuple, but there was no
    1603             :                      * ending length word before it.  That means that the prev
    1604             :                      * tuple is the first tuple in the file.  It is now the
    1605             :                      * next to read in forward direction (not obviously right,
    1606             :                      * but that is what in-memory case does).
    1607             :                      */
    1608           6 :                     return false;
    1609             :                 }
    1610          12 :                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
    1611           0 :                     elog(ERROR, "bogus tuple length in backward scan");
    1612             :             }
    1613             : 
    1614          24 :             tuplen = getlen(state->result_tape, false);
    1615             : 
    1616             :             /*
    1617             :              * Now we have the length of the prior tuple, back up and read it.
    1618             :              * Note: READTUP expects we are positioned after the initial
    1619             :              * length word of the tuple, so back up to that point.
    1620             :              */
    1621          24 :             nmoved = LogicalTapeBackspace(state->result_tape,
    1622             :                                           tuplen);
    1623          24 :             if (nmoved != tuplen)
    1624           0 :                 elog(ERROR, "bogus tuple length in backward scan");
    1625          24 :             READTUP(state, stup, state->result_tape, tuplen);
    1626             : 
    1627             :             /*
    1628             :              * Remember the tuple we return, so that we can recycle its memory
    1629             :              * on next call. (This can be NULL, in the Datum case).
    1630             :              */
    1631          24 :             state->lastReturnedTuple = stup->tuple;
    1632             : 
    1633          24 :             return true;
    1634             : 
    1635     4574586 :         case TSS_FINALMERGE:
    1636             :             Assert(forward);
    1637             :             /* We are managing memory ourselves, with the slab allocator. */
    1638             :             Assert(state->slabAllocatorUsed);
    1639             : 
    1640             :             /*
    1641             :              * The slab slot holding the tuple that we returned in previous
    1642             :              * gettuple call can now be reused.
    1643             :              */
    1644     4574586 :             if (state->lastReturnedTuple)
    1645             :             {
    1646     4514296 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1647     4514296 :                 state->lastReturnedTuple = NULL;
    1648             :             }
    1649             : 
    1650             :             /*
    1651             :              * This code should match the inner loop of mergeonerun().
    1652             :              */
    1653     4574586 :             if (state->memtupcount > 0)
    1654             :             {
    1655     4574344 :                 int         srcTapeIndex = state->memtuples[0].srctape;
    1656     4574344 :                 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
    1657             :                 SortTuple   newtup;
    1658             : 
    1659     4574344 :                 *stup = state->memtuples[0];
    1660             : 
    1661             :                 /*
    1662             :                  * Remember the tuple we return, so that we can recycle its
    1663             :                  * memory on next call. (This can be NULL, in the Datum case).
    1664             :                  */
    1665     4574344 :                 state->lastReturnedTuple = stup->tuple;
    1666             : 
    1667             :                 /*
    1668             :                  * Pull next tuple from tape, and replace the returned tuple
    1669             :                  * at top of the heap with it.
    1670             :                  */
    1671     4574344 :                 if (!mergereadnext(state, srcTape, &newtup))
    1672             :                 {
    1673             :                     /*
    1674             :                      * If no more data, we've reached end of run on this tape.
    1675             :                      * Remove the top node from the heap.
    1676             :                      */
    1677         354 :                     tuplesort_heap_delete_top(state);
    1678         354 :                     state->nInputRuns--;
    1679             : 
    1680             :                     /*
    1681             :                      * Close the tape.  It'd go away at the end of the sort
    1682             :                      * anyway, but better to release the memory early.
    1683             :                      */
    1684         354 :                     LogicalTapeClose(srcTape);
    1685         354 :                     return true;
    1686             :                 }
    1687     4573990 :                 newtup.srctape = srcTapeIndex;
    1688     4573990 :                 tuplesort_heap_replace_top(state, &newtup);
    1689     4573990 :                 return true;
    1690             :             }
    1691         242 :             return false;
    1692             : 
    1693           0 :         default:
    1694           0 :             elog(ERROR, "invalid tuplesort state");
    1695             :             return false;       /* keep compiler quiet */
    1696             :     }
    1697             : }
    1698             : 
    1699             : 
    1700             : /*
    1701             :  * Advance over N tuples in either forward or back direction,
    1702             :  * without returning any data.  N==0 is a no-op.
    1703             :  * Returns true if successful, false if ran out of tuples.
    1704             :  */
    1705             : bool
    1706         392 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
    1707             : {
    1708             :     MemoryContext oldcontext;
    1709             : 
    1710             :     /*
    1711             :      * We don't actually support backwards skip yet, because no callers need
    1712             :      * it.  The API is designed to allow for that later, though.
    1713             :      */
    1714             :     Assert(forward);
    1715             :     Assert(ntuples >= 0);
    1716             :     Assert(!WORKER(state));
    1717             : 
    1718         392 :     switch (state->status)
    1719             :     {
    1720         368 :         case TSS_SORTEDINMEM:
    1721         368 :             if (state->memtupcount - state->current >= ntuples)
    1722             :             {
    1723         368 :                 state->current += ntuples;
    1724         368 :                 return true;
    1725             :             }
    1726           0 :             state->current = state->memtupcount;
    1727           0 :             state->eof_reached = true;
    1728             : 
    1729             :             /*
    1730             :              * Complain if caller tries to retrieve more tuples than
    1731             :              * originally asked for in a bounded sort.  This is because
    1732             :              * returning EOF here might be the wrong thing.
    1733             :              */
    1734           0 :             if (state->bounded && state->current >= state->bound)
    1735           0 :                 elog(ERROR, "retrieved too many tuples in a bounded sort");
    1736             : 
    1737           0 :             return false;
    1738             : 
    1739          24 :         case TSS_SORTEDONTAPE:
    1740             :         case TSS_FINALMERGE:
    1741             : 
    1742             :             /*
    1743             :              * We could probably optimize these cases better, but for now it's
    1744             :              * not worth the trouble.
    1745             :              */
    1746          24 :             oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1747      240132 :             while (ntuples-- > 0)
    1748             :             {
    1749             :                 SortTuple   stup;
    1750             : 
    1751      240108 :                 if (!tuplesort_gettuple_common(state, forward, &stup))
    1752             :                 {
    1753           0 :                     MemoryContextSwitchTo(oldcontext);
    1754           0 :                     return false;
    1755             :                 }
    1756      240108 :                 CHECK_FOR_INTERRUPTS();
    1757             :             }
    1758          24 :             MemoryContextSwitchTo(oldcontext);
    1759          24 :             return true;
    1760             : 
    1761           0 :         default:
    1762           0 :             elog(ERROR, "invalid tuplesort state");
    1763             :             return false;       /* keep compiler quiet */
    1764             :     }
    1765             : }
    1766             : 
    1767             : /*
    1768             :  * tuplesort_merge_order - report merge order we'll use for given memory
    1769             :  * (note: "merge order" just means the number of input tapes in the merge).
    1770             :  *
    1771             :  * This is exported for use by the planner.  allowedMem is in bytes.
    1772             :  */
    1773             : int
    1774       17934 : tuplesort_merge_order(int64 allowedMem)
    1775             : {
    1776             :     int         mOrder;
    1777             : 
    1778             :     /*----------
    1779             :      * In the merge phase, we need buffer space for each input and output tape.
    1780             :      * Each pass in the balanced merge algorithm reads from M input tapes, and
    1781             :      * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
    1782             :      * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
    1783             :      * input tape.
    1784             :      *
    1785             :      * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
    1786             :      *            N * TAPE_BUFFER_OVERHEAD
    1787             :      *
    1788             :      * Except for the last and next-to-last merge passes, where there can be
    1789             :      * fewer tapes left to process, M = N.  We choose M so that we have the
    1790             :      * desired amount of memory available for the input buffers
    1791             :      * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
    1792             :      * available for the tape buffers (allowedMem).
    1793             :      *
    1794             :      * Note: you might be thinking we need to account for the memtuples[]
    1795             :      * array in this calculation, but we effectively treat that as part of the
    1796             :      * MERGE_BUFFER_SIZE workspace.
    1797             :      *----------
    1798             :      */
    1799       17934 :     mOrder = allowedMem /
    1800             :         (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
    1801             : 
    1802             :     /*
    1803             :      * Even in minimum memory, use at least a MINORDER merge.  On the other
    1804             :      * hand, even when we have lots of memory, do not use more than a MAXORDER
    1805             :      * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
    1806             :      * additional tape reduces the amount of memory available to build runs,
    1807             :      * which in turn can cause the same sort to need more runs, which makes
    1808             :      * merging slower even if it can still be done in a single pass.  Also,
    1809             :      * high order merges are quite slow due to CPU cache effects; it can be
    1810             :      * faster to pay the I/O cost of a multi-pass merge than to perform a
    1811             :      * single merge pass across many hundreds of tapes.
    1812             :      */
    1813       17934 :     mOrder = Max(mOrder, MINORDER);
    1814       17934 :     mOrder = Min(mOrder, MAXORDER);
    1815             : 
    1816       17934 :     return mOrder;
    1817             : }
    1818             : 
    1819             : /*
    1820             :  * Helper function to calculate how much memory to allocate for the read buffer
    1821             :  * of each input tape in a merge pass.
    1822             :  *
    1823             :  * 'avail_mem' is the amount of memory available for the buffers of all the
    1824             :  *      tapes, both input and output.
    1825             :  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
    1826             :  * 'maxOutputTapes' is the max. number of output tapes we should produce.
    1827             :  */
    1828             : static int64
    1829         308 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
    1830             :                        int maxOutputTapes)
    1831             : {
    1832             :     int         nOutputRuns;
    1833             :     int         nOutputTapes;
    1834             : 
    1835             :     /*
    1836             :      * How many output tapes will we produce in this pass?
    1837             :      *
    1838             :      * This is nInputRuns / nInputTapes, rounded up.
    1839             :      */
    1840         308 :     nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
    1841             : 
    1842         308 :     nOutputTapes = Min(nOutputRuns, maxOutputTapes);
    1843             : 
    1844             :     /*
    1845             :      * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
    1846             :      * remaining memory is divided evenly between the input tapes.
    1847             :      *
    1848             :      * This also follows from the formula in tuplesort_merge_order, but here
    1849             :      * we derive the input buffer size from the amount of memory available,
    1850             :      * and M and N.
    1851             :      */
    1852         308 :     return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
    1853             : }
    1854             : 
    1855             : /*
    1856             :  * inittapes - initialize for tape sorting.
    1857             :  *
    1858             :  * This is called only if we have found we won't sort in memory.
    1859             :  */
    1860             : static void
    1861         566 : inittapes(Tuplesortstate *state, bool mergeruns)
    1862             : {
    1863             :     Assert(!LEADER(state));
    1864             : 
    1865         566 :     if (mergeruns)
    1866             :     {
    1867             :         /* Compute number of input tapes to use when merging */
    1868         122 :         state->maxTapes = tuplesort_merge_order(state->allowedMem);
    1869             :     }
    1870             :     else
    1871             :     {
    1872             :         /* Workers can sometimes produce single run, output without merge */
    1873             :         Assert(WORKER(state));
    1874         444 :         state->maxTapes = MINORDER;
    1875             :     }
    1876             : 
    1877         566 :     if (trace_sort)
    1878           0 :         elog(LOG, "worker %d switching to external sort with %d tapes: %s",
    1879             :              state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
    1880             : 
    1881             :     /* Create the tape set */
    1882         566 :     inittapestate(state, state->maxTapes);
    1883         566 :     state->tapeset =
    1884         566 :         LogicalTapeSetCreate(false,
    1885         566 :                              state->shared ? &state->shared->fileset : NULL,
    1886             :                              state->worker);
    1887             : 
    1888         566 :     state->currentRun = 0;
    1889             : 
    1890             :     /*
    1891             :      * Initialize logical tape arrays.
    1892             :      */
    1893         566 :     state->inputTapes = NULL;
    1894         566 :     state->nInputTapes = 0;
    1895         566 :     state->nInputRuns = 0;
    1896             : 
    1897         566 :     state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
    1898         566 :     state->nOutputTapes = 0;
    1899         566 :     state->nOutputRuns = 0;
    1900             : 
    1901         566 :     state->status = TSS_BUILDRUNS;
    1902             : 
    1903         566 :     selectnewtape(state);
    1904         566 : }
    1905             : 
    1906             : /*
    1907             :  * inittapestate - initialize generic tape management state
    1908             :  */
    1909             : static void
    1910         722 : inittapestate(Tuplesortstate *state, int maxTapes)
    1911             : {
    1912             :     int64       tapeSpace;
    1913             : 
    1914             :     /*
    1915             :      * Decrease availMem to reflect the space needed for tape buffers; but
    1916             :      * don't decrease it to the point that we have no room for tuples. (That
    1917             :      * case is only likely to occur if sorting pass-by-value Datums; in all
    1918             :      * other scenarios the memtuples[] array is unlikely to occupy more than
    1919             :      * half of allowedMem.  In the pass-by-value case it's not important to
    1920             :      * account for tuple space, so we don't care if LACKMEM becomes
    1921             :      * inaccurate.)
    1922             :      */
    1923         722 :     tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
    1924             : 
    1925         722 :     if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
    1926         618 :         USEMEM(state, tapeSpace);
    1927             : 
    1928             :     /*
    1929             :      * Make sure that the temp file(s) underlying the tape set are created in
    1930             :      * suitable temp tablespaces.  For parallel sorts, this should have been
    1931             :      * called already, but it doesn't matter if it is called a second time.
    1932             :      */
    1933         722 :     PrepareTempTablespaces();
    1934         722 : }
    1935             : 
    1936             : /*
    1937             :  * selectnewtape -- select next tape to output to.
    1938             :  *
    1939             :  * This is called after finishing a run when we know another run
    1940             :  * must be started.  This is used both when building the initial
    1941             :  * runs, and during merge passes.
    1942             :  */
    1943             : static void
    1944        1642 : selectnewtape(Tuplesortstate *state)
    1945             : {
    1946             :     /*
    1947             :      * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
    1948             :      * both zero.  On each call, we create a new output tape to hold the next
    1949             :      * run, until maxTapes is reached.  After that, we assign new runs to the
    1950             :      * existing tapes in a round robin fashion.
    1951             :      */
    1952        1642 :     if (state->nOutputTapes < state->maxTapes)
    1953             :     {
    1954             :         /* Create a new tape to hold the next run */
    1955             :         Assert(state->outputTapes[state->nOutputRuns] == NULL);
    1956             :         Assert(state->nOutputRuns == state->nOutputTapes);
    1957        1060 :         state->destTape = LogicalTapeCreate(state->tapeset);
    1958        1060 :         state->outputTapes[state->nOutputTapes] = state->destTape;
    1959        1060 :         state->nOutputTapes++;
    1960        1060 :         state->nOutputRuns++;
    1961             :     }
    1962             :     else
    1963             :     {
    1964             :         /*
    1965             :          * We have reached the max number of tapes.  Append to an existing
    1966             :          * tape.
    1967             :          */
    1968         582 :         state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
    1969         582 :         state->nOutputRuns++;
    1970             :     }
    1971        1642 : }
    1972             : 
    1973             : /*
    1974             :  * Initialize the slab allocation arena, for the given number of slots.
    1975             :  */
    1976             : static void
    1977         278 : init_slab_allocator(Tuplesortstate *state, int numSlots)
    1978             : {
    1979         278 :     if (numSlots > 0)
    1980             :     {
    1981             :         char       *p;
    1982             :         int         i;
    1983             : 
    1984         266 :         state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
    1985         266 :         state->slabMemoryEnd = state->slabMemoryBegin +
    1986         266 :             numSlots * SLAB_SLOT_SIZE;
    1987         266 :         state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
    1988         266 :         USEMEM(state, numSlots * SLAB_SLOT_SIZE);
    1989             : 
    1990         266 :         p = state->slabMemoryBegin;
    1991        1012 :         for (i = 0; i < numSlots - 1; i++)
    1992             :         {
    1993         746 :             ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
    1994         746 :             p += SLAB_SLOT_SIZE;
    1995             :         }
    1996         266 :         ((SlabSlot *) p)->nextfree = NULL;
    1997             :     }
    1998             :     else
    1999             :     {
    2000          12 :         state->slabMemoryBegin = state->slabMemoryEnd = NULL;
    2001          12 :         state->slabFreeHead = NULL;
    2002             :     }
    2003         278 :     state->slabAllocatorUsed = true;
    2004         278 : }
    2005             : 
    2006             : /*
    2007             :  * mergeruns -- merge all the completed initial runs.
    2008             :  *
    2009             :  * This implements the Balanced k-Way Merge Algorithm.  All input data has
    2010             :  * already been written to initial runs on tape (see dumptuples).
    2011             :  */
    2012             : static void
    2013         278 : mergeruns(Tuplesortstate *state)
    2014             : {
    2015             :     int         tapenum;
    2016             : 
    2017             :     Assert(state->status == TSS_BUILDRUNS);
    2018             :     Assert(state->memtupcount == 0);
    2019             : 
    2020         278 :     if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    2021             :     {
    2022             :         /*
    2023             :          * If there are multiple runs to be merged, when we go to read back
    2024             :          * tuples from disk, abbreviated keys will not have been stored, and
    2025             :          * we don't care to regenerate them.  Disable abbreviation from this
    2026             :          * point on.
    2027             :          */
    2028          30 :         state->base.sortKeys->abbrev_converter = NULL;
    2029          30 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
    2030             : 
    2031             :         /* Not strictly necessary, but be tidy */
    2032          30 :         state->base.sortKeys->abbrev_abort = NULL;
    2033          30 :         state->base.sortKeys->abbrev_full_comparator = NULL;
    2034             :     }
    2035             : 
    2036             :     /*
    2037             :      * Reset tuple memory.  We've freed all the tuples that we previously
    2038             :      * allocated.  We will use the slab allocator from now on.
    2039             :      */
    2040         278 :     MemoryContextResetOnly(state->base.tuplecontext);
    2041             : 
    2042             :     /*
    2043             :      * We no longer need a large memtuples array.  (We will allocate a smaller
    2044             :      * one for the heap later.)
    2045             :      */
    2046         278 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2047         278 :     pfree(state->memtuples);
    2048         278 :     state->memtuples = NULL;
    2049             : 
    2050             :     /*
    2051             :      * Initialize the slab allocator.  We need one slab slot per input tape,
    2052             :      * for the tuples in the heap, plus one to hold the tuple last returned
    2053             :      * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
    2054             :      * however, we don't need to do allocate anything.)
    2055             :      *
    2056             :      * In a multi-pass merge, we could shrink this allocation for the last
    2057             :      * merge pass, if it has fewer tapes than previous passes, but we don't
    2058             :      * bother.
    2059             :      *
    2060             :      * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
    2061             :      * to track memory usage of individual tuples.
    2062             :      */
    2063         278 :     if (state->base.tuples)
    2064         266 :         init_slab_allocator(state, state->nOutputTapes + 1);
    2065             :     else
    2066          12 :         init_slab_allocator(state, 0);
    2067             : 
    2068             :     /*
    2069             :      * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
    2070             :      * from each input tape.
    2071             :      *
    2072             :      * We could shrink this, too, between passes in a multi-pass merge, but we
    2073             :      * don't bother.  (The initial input tapes are still in outputTapes.  The
    2074             :      * number of input tapes will not increase between passes.)
    2075             :      */
    2076         278 :     state->memtupsize = state->nOutputTapes;
    2077         556 :     state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
    2078         278 :                                                         state->nOutputTapes * sizeof(SortTuple));
    2079         278 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2080             : 
    2081             :     /*
    2082             :      * Use all the remaining memory we have available for tape buffers among
    2083             :      * all the input tapes.  At the beginning of each merge pass, we will
    2084             :      * divide this memory between the input and output tapes in the pass.
    2085             :      */
    2086         278 :     state->tape_buffer_mem = state->availMem;
    2087         278 :     USEMEM(state, state->tape_buffer_mem);
    2088         278 :     if (trace_sort)
    2089           0 :         elog(LOG, "worker %d using %zu KB of memory for tape buffers",
    2090             :              state->worker, state->tape_buffer_mem / 1024);
    2091             : 
    2092             :     for (;;)
    2093             :     {
    2094             :         /*
    2095             :          * On the first iteration, or if we have read all the runs from the
    2096             :          * input tapes in a multi-pass merge, it's time to start a new pass.
    2097             :          * Rewind all the output tapes, and make them inputs for the next
    2098             :          * pass.
    2099             :          */
    2100         416 :         if (state->nInputRuns == 0)
    2101             :         {
    2102             :             int64       input_buffer_size;
    2103             : 
    2104             :             /* Close the old, emptied, input tapes */
    2105         308 :             if (state->nInputTapes > 0)
    2106             :             {
    2107         210 :                 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2108         180 :                     LogicalTapeClose(state->inputTapes[tapenum]);
    2109          30 :                 pfree(state->inputTapes);
    2110             :             }
    2111             : 
    2112             :             /* Previous pass's outputs become next pass's inputs. */
    2113         308 :             state->inputTapes = state->outputTapes;
    2114         308 :             state->nInputTapes = state->nOutputTapes;
    2115         308 :             state->nInputRuns = state->nOutputRuns;
    2116             : 
    2117             :             /*
    2118             :              * Reset output tape variables.  The actual LogicalTapes will be
    2119             :              * created as needed, here we only allocate the array to hold
    2120             :              * them.
    2121             :              */
    2122         308 :             state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
    2123         308 :             state->nOutputTapes = 0;
    2124         308 :             state->nOutputRuns = 0;
    2125             : 
    2126             :             /*
    2127             :              * Redistribute the memory allocated for tape buffers, among the
    2128             :              * new input and output tapes.
    2129             :              */
    2130         308 :             input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
    2131             :                                                        state->nInputTapes,
    2132             :                                                        state->nInputRuns,
    2133             :                                                        state->maxTapes);
    2134             : 
    2135         308 :             if (trace_sort)
    2136           0 :                 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
    2137             :                      state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
    2138             :                      pg_rusage_show(&state->ru_start));
    2139             : 
    2140             :             /* Prepare the new input tapes for merge pass. */
    2141        1222 :             for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2142         914 :                 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
    2143             : 
    2144             :             /*
    2145             :              * If there's just one run left on each input tape, then only one
    2146             :              * merge pass remains.  If we don't have to produce a materialized
    2147             :              * sorted tape, we can stop at this point and do the final merge
    2148             :              * on-the-fly.
    2149             :              */
    2150         308 :             if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
    2151         290 :                 && state->nInputRuns <= state->nInputTapes
    2152         260 :                 && !WORKER(state))
    2153             :             {
    2154             :                 /* Tell logtape.c we won't be writing anymore */
    2155         260 :                 LogicalTapeSetForgetFreeSpace(state->tapeset);
    2156             :                 /* Initialize for the final merge pass */
    2157         260 :                 beginmerge(state);
    2158         260 :                 state->status = TSS_FINALMERGE;
    2159         260 :                 return;
    2160             :             }
    2161             :         }
    2162             : 
    2163             :         /* Select an output tape */
    2164         156 :         selectnewtape(state);
    2165             : 
    2166             :         /* Merge one run from each input tape. */
    2167         156 :         mergeonerun(state);
    2168             : 
    2169             :         /*
    2170             :          * If the input tapes are empty, and we output only one output run,
    2171             :          * we're done.  The current output tape contains the final result.
    2172             :          */
    2173         156 :         if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
    2174          18 :             break;
    2175             :     }
    2176             : 
    2177             :     /*
    2178             :      * Done.  The result is on a single run on a single tape.
    2179             :      */
    2180          18 :     state->result_tape = state->outputTapes[0];
    2181          18 :     if (!WORKER(state))
    2182          18 :         LogicalTapeFreeze(state->result_tape, NULL);
    2183             :     else
    2184           0 :         worker_freeze_result_tape(state);
    2185          18 :     state->status = TSS_SORTEDONTAPE;
    2186             : 
    2187             :     /* Close all the now-empty input tapes, to release their read buffers. */
    2188         102 :     for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2189          84 :         LogicalTapeClose(state->inputTapes[tapenum]);
    2190             : }
    2191             : 
    2192             : /*
    2193             :  * Merge one run from each input tape.
    2194             :  */
    2195             : static void
    2196         156 : mergeonerun(Tuplesortstate *state)
    2197             : {
    2198             :     int         srcTapeIndex;
    2199             :     LogicalTape *srcTape;
    2200             : 
    2201             :     /*
    2202             :      * Start the merge by loading one tuple from each active source tape into
    2203             :      * the heap.
    2204             :      */
    2205         156 :     beginmerge(state);
    2206             : 
    2207             :     Assert(state->slabAllocatorUsed);
    2208             : 
    2209             :     /*
    2210             :      * Execute merge by repeatedly extracting lowest tuple in heap, writing it
    2211             :      * out, and replacing it with next tuple from same tape (if there is
    2212             :      * another one).
    2213             :      */
    2214      855588 :     while (state->memtupcount > 0)
    2215             :     {
    2216             :         SortTuple   stup;
    2217             : 
    2218             :         /* write the tuple to destTape */
    2219      855432 :         srcTapeIndex = state->memtuples[0].srctape;
    2220      855432 :         srcTape = state->inputTapes[srcTapeIndex];
    2221      855432 :         WRITETUP(state, state->destTape, &state->memtuples[0]);
    2222             : 
    2223             :         /* recycle the slot of the tuple we just wrote out, for the next read */
    2224      855432 :         if (state->memtuples[0].tuple)
    2225      735348 :             RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
    2226             : 
    2227             :         /*
    2228             :          * pull next tuple from the tape, and replace the written-out tuple in
    2229             :          * the heap with it.
    2230             :          */
    2231      855432 :         if (mergereadnext(state, srcTape, &stup))
    2232             :         {
    2233      854586 :             stup.srctape = srcTapeIndex;
    2234      854586 :             tuplesort_heap_replace_top(state, &stup);
    2235             :         }
    2236             :         else
    2237             :         {
    2238         846 :             tuplesort_heap_delete_top(state);
    2239         846 :             state->nInputRuns--;
    2240             :         }
    2241             :     }
    2242             : 
    2243             :     /*
    2244             :      * When the heap empties, we're done.  Write an end-of-run marker on the
    2245             :      * output tape.
    2246             :      */
    2247         156 :     markrunend(state->destTape);
    2248         156 : }
    2249             : 
    2250             : /*
    2251             :  * beginmerge - initialize for a merge pass
    2252             :  *
    2253             :  * Fill the merge heap with the first tuple from each input tape.
    2254             :  */
    2255             : static void
    2256         416 : beginmerge(Tuplesortstate *state)
    2257             : {
    2258             :     int         activeTapes;
    2259             :     int         srcTapeIndex;
    2260             : 
    2261             :     /* Heap should be empty here */
    2262             :     Assert(state->memtupcount == 0);
    2263             : 
    2264         416 :     activeTapes = Min(state->nInputTapes, state->nInputRuns);
    2265             : 
    2266        1912 :     for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    2267             :     {
    2268             :         SortTuple   tup;
    2269             : 
    2270        1496 :         if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
    2271             :         {
    2272        1248 :             tup.srctape = srcTapeIndex;
    2273        1248 :             tuplesort_heap_insert(state, &tup);
    2274             :         }
    2275             :     }
    2276         416 : }
    2277             : 
    2278             : /*
    2279             :  * mergereadnext - read next tuple from one merge input tape
    2280             :  *
    2281             :  * Returns false on EOF.
    2282             :  */
    2283             : static bool
    2284     5431272 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
    2285             : {
    2286             :     unsigned int tuplen;
    2287             : 
    2288             :     /* read next tuple, if any */
    2289     5431272 :     if ((tuplen = getlen(srcTape, true)) == 0)
    2290        1448 :         return false;
    2291     5429824 :     READTUP(state, stup, srcTape, tuplen);
    2292             : 
    2293     5429824 :     return true;
    2294             : }
    2295             : 
    2296             : /*
    2297             :  * dumptuples - remove tuples from memtuples and write initial run to tape
    2298             :  *
    2299             :  * When alltuples = true, dump everything currently in memory.  (This case is
    2300             :  * only used at end of input data.)
    2301             :  */
    2302             : static void
    2303     1068994 : dumptuples(Tuplesortstate *state, bool alltuples)
    2304             : {
    2305             :     int         memtupwrite;
    2306             :     int         i;
    2307             : 
    2308             :     /*
    2309             :      * Nothing to do if we still fit in available memory and have array slots,
    2310             :      * unless this is the final call during initial run generation.
    2311             :      */
    2312     1068994 :     if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
    2313     1068074 :         !alltuples)
    2314     1067508 :         return;
    2315             : 
    2316             :     /*
    2317             :      * Final call might require no sorting, in rare cases where we just so
    2318             :      * happen to have previously LACKMEM()'d at the point where exactly all
    2319             :      * remaining tuples are loaded into memory, just before input was
    2320             :      * exhausted.  In general, short final runs are quite possible, but avoid
    2321             :      * creating a completely empty run.  In a worker, though, we must produce
    2322             :      * at least one tape, even if it's empty.
    2323             :      */
    2324        1486 :     if (state->memtupcount == 0 && state->currentRun > 0)
    2325           0 :         return;
    2326             : 
    2327             :     Assert(state->status == TSS_BUILDRUNS);
    2328             : 
    2329             :     /*
    2330             :      * It seems unlikely that this limit will ever be exceeded, but take no
    2331             :      * chances
    2332             :      */
    2333        1486 :     if (state->currentRun == INT_MAX)
    2334           0 :         ereport(ERROR,
    2335             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2336             :                  errmsg("cannot have more than %d runs for an external sort",
    2337             :                         INT_MAX)));
    2338             : 
    2339        1486 :     if (state->currentRun > 0)
    2340         920 :         selectnewtape(state);
    2341             : 
    2342        1486 :     state->currentRun++;
    2343             : 
    2344        1486 :     if (trace_sort)
    2345           0 :         elog(LOG, "worker %d starting quicksort of run %d: %s",
    2346             :              state->worker, state->currentRun,
    2347             :              pg_rusage_show(&state->ru_start));
    2348             : 
    2349             :     /*
    2350             :      * Sort all tuples accumulated within the allowed amount of memory for
    2351             :      * this run using quicksort
    2352             :      */
    2353        1486 :     tuplesort_sort_memtuples(state);
    2354             : 
    2355        1486 :     if (trace_sort)
    2356           0 :         elog(LOG, "worker %d finished quicksort of run %d: %s",
    2357             :              state->worker, state->currentRun,
    2358             :              pg_rusage_show(&state->ru_start));
    2359             : 
    2360        1486 :     memtupwrite = state->memtupcount;
    2361     5071082 :     for (i = 0; i < memtupwrite; i++)
    2362             :     {
    2363     5069596 :         SortTuple  *stup = &state->memtuples[i];
    2364             : 
    2365     5069596 :         WRITETUP(state, state->destTape, stup);
    2366             :     }
    2367             : 
    2368        1486 :     state->memtupcount = 0;
    2369             : 
    2370             :     /*
    2371             :      * Reset tuple memory.  We've freed all of the tuples that we previously
    2372             :      * allocated.  It's important to avoid fragmentation when there is a stark
    2373             :      * change in the sizes of incoming tuples.  In bounded sorts,
    2374             :      * fragmentation due to AllocSetFree's bucketing by size class might be
    2375             :      * particularly bad if this step wasn't taken.
    2376             :      */
    2377        1486 :     MemoryContextReset(state->base.tuplecontext);
    2378             : 
    2379             :     /*
    2380             :      * Now update the memory accounting to subtract the memory used by the
    2381             :      * tuple.
    2382             :      */
    2383        1486 :     FREEMEM(state, state->tupleMem);
    2384        1486 :     state->tupleMem = 0;
    2385             : 
    2386        1486 :     markrunend(state->destTape);
    2387             : 
    2388        1486 :     if (trace_sort)
    2389           0 :         elog(LOG, "worker %d finished writing run %d to tape %d: %s",
    2390             :              state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
    2391             :              pg_rusage_show(&state->ru_start));
    2392             : }
    2393             : 
    2394             : /*
    2395             :  * tuplesort_rescan     - rewind and replay the scan
    2396             :  */
    2397             : void
    2398          58 : tuplesort_rescan(Tuplesortstate *state)
    2399             : {
    2400          58 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2401             : 
    2402             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2403             : 
    2404          58 :     switch (state->status)
    2405             :     {
    2406          52 :         case TSS_SORTEDINMEM:
    2407          52 :             state->current = 0;
    2408          52 :             state->eof_reached = false;
    2409          52 :             state->markpos_offset = 0;
    2410          52 :             state->markpos_eof = false;
    2411          52 :             break;
    2412           6 :         case TSS_SORTEDONTAPE:
    2413           6 :             LogicalTapeRewindForRead(state->result_tape, 0);
    2414           6 :             state->eof_reached = false;
    2415           6 :             state->markpos_block = 0L;
    2416           6 :             state->markpos_offset = 0;
    2417           6 :             state->markpos_eof = false;
    2418           6 :             break;
    2419           0 :         default:
    2420           0 :             elog(ERROR, "invalid tuplesort state");
    2421             :             break;
    2422             :     }
    2423             : 
    2424          58 :     MemoryContextSwitchTo(oldcontext);
    2425          58 : }
    2426             : 
    2427             : /*
    2428             :  * tuplesort_markpos    - saves current position in the merged sort file
    2429             :  */
    2430             : void
    2431      586202 : tuplesort_markpos(Tuplesortstate *state)
    2432             : {
    2433      586202 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2434             : 
    2435             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2436             : 
    2437      586202 :     switch (state->status)
    2438             :     {
    2439      577394 :         case TSS_SORTEDINMEM:
    2440      577394 :             state->markpos_offset = state->current;
    2441      577394 :             state->markpos_eof = state->eof_reached;
    2442      577394 :             break;
    2443        8808 :         case TSS_SORTEDONTAPE:
    2444        8808 :             LogicalTapeTell(state->result_tape,
    2445             :                             &state->markpos_block,
    2446             :                             &state->markpos_offset);
    2447        8808 :             state->markpos_eof = state->eof_reached;
    2448        8808 :             break;
    2449           0 :         default:
    2450           0 :             elog(ERROR, "invalid tuplesort state");
    2451             :             break;
    2452             :     }
    2453             : 
    2454      586202 :     MemoryContextSwitchTo(oldcontext);
    2455      586202 : }
    2456             : 
    2457             : /*
    2458             :  * tuplesort_restorepos - restores current position in merged sort file to
    2459             :  *                        last saved position
    2460             :  */
    2461             : void
    2462       32786 : tuplesort_restorepos(Tuplesortstate *state)
    2463             : {
    2464       32786 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2465             : 
    2466             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2467             : 
    2468       32786 :     switch (state->status)
    2469             :     {
    2470       26594 :         case TSS_SORTEDINMEM:
    2471       26594 :             state->current = state->markpos_offset;
    2472       26594 :             state->eof_reached = state->markpos_eof;
    2473       26594 :             break;
    2474        6192 :         case TSS_SORTEDONTAPE:
    2475        6192 :             LogicalTapeSeek(state->result_tape,
    2476             :                             state->markpos_block,
    2477             :                             state->markpos_offset);
    2478        6192 :             state->eof_reached = state->markpos_eof;
    2479        6192 :             break;
    2480           0 :         default:
    2481           0 :             elog(ERROR, "invalid tuplesort state");
    2482             :             break;
    2483             :     }
    2484             : 
    2485       32786 :     MemoryContextSwitchTo(oldcontext);
    2486       32786 : }
    2487             : 
    2488             : /*
    2489             :  * tuplesort_get_stats - extract summary statistics
    2490             :  *
    2491             :  * This can be called after tuplesort_performsort() finishes to obtain
    2492             :  * printable summary information about how the sort was performed.
    2493             :  */
    2494             : void
    2495         396 : tuplesort_get_stats(Tuplesortstate *state,
    2496             :                     TuplesortInstrumentation *stats)
    2497             : {
    2498             :     /*
    2499             :      * Note: it might seem we should provide both memory and disk usage for a
    2500             :      * disk-based sort.  However, the current code doesn't track memory space
    2501             :      * accurately once we have begun to return tuples to the caller (since we
    2502             :      * don't account for pfree's the caller is expected to do), so we cannot
    2503             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    2504             :      * to fix.  Is it worth creating an API for the memory context code to
    2505             :      * tell us how much is actually used in sortcontext?
    2506             :      */
    2507         396 :     tuplesort_updatemax(state);
    2508             : 
    2509         396 :     if (state->isMaxSpaceDisk)
    2510           6 :         stats->spaceType = SORT_SPACE_TYPE_DISK;
    2511             :     else
    2512         390 :         stats->spaceType = SORT_SPACE_TYPE_MEMORY;
    2513         396 :     stats->spaceUsed = (state->maxSpace + 1023) / 1024;
    2514             : 
    2515         396 :     switch (state->maxSpaceStatus)
    2516             :     {
    2517         390 :         case TSS_SORTEDINMEM:
    2518         390 :             if (state->boundUsed)
    2519          42 :                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
    2520             :             else
    2521         348 :                 stats->sortMethod = SORT_TYPE_QUICKSORT;
    2522         390 :             break;
    2523           0 :         case TSS_SORTEDONTAPE:
    2524           0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
    2525           0 :             break;
    2526           6 :         case TSS_FINALMERGE:
    2527           6 :             stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
    2528           6 :             break;
    2529           0 :         default:
    2530           0 :             stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
    2531           0 :             break;
    2532             :     }
    2533         396 : }
    2534             : 
    2535             : /*
    2536             :  * Convert TuplesortMethod to a string.
    2537             :  */
    2538             : const char *
    2539         294 : tuplesort_method_name(TuplesortMethod m)
    2540             : {
    2541         294 :     switch (m)
    2542             :     {
    2543           0 :         case SORT_TYPE_STILL_IN_PROGRESS:
    2544           0 :             return "still in progress";
    2545          42 :         case SORT_TYPE_TOP_N_HEAPSORT:
    2546          42 :             return "top-N heapsort";
    2547         246 :         case SORT_TYPE_QUICKSORT:
    2548         246 :             return "quicksort";
    2549           0 :         case SORT_TYPE_EXTERNAL_SORT:
    2550           0 :             return "external sort";
    2551           6 :         case SORT_TYPE_EXTERNAL_MERGE:
    2552           6 :             return "external merge";
    2553             :     }
    2554             : 
    2555           0 :     return "unknown";
    2556             : }
    2557             : 
    2558             : /*
    2559             :  * Convert TuplesortSpaceType to a string.
    2560             :  */
    2561             : const char *
    2562         258 : tuplesort_space_type_name(TuplesortSpaceType t)
    2563             : {
    2564             :     Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
    2565         258 :     return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
    2566             : }
    2567             : 
    2568             : 
    2569             : /*
    2570             :  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
    2571             :  */
    2572             : 
    2573             : /*
    2574             :  * Convert the existing unordered array of SortTuples to a bounded heap,
    2575             :  * discarding all but the smallest "state->bound" tuples.
    2576             :  *
    2577             :  * When working with a bounded heap, we want to keep the largest entry
    2578             :  * at the root (array entry zero), instead of the smallest as in the normal
    2579             :  * sort case.  This allows us to discard the largest entry cheaply.
    2580             :  * Therefore, we temporarily reverse the sort direction.
    2581             :  */
    2582             : static void
    2583         408 : make_bounded_heap(Tuplesortstate *state)
    2584             : {
    2585         408 :     int         tupcount = state->memtupcount;
    2586             :     int         i;
    2587             : 
    2588             :     Assert(state->status == TSS_INITIAL);
    2589             :     Assert(state->bounded);
    2590             :     Assert(tupcount >= state->bound);
    2591             :     Assert(SERIAL(state));
    2592             : 
    2593             :     /* Reverse sort direction so largest entry will be at root */
    2594         408 :     reversedirection(state);
    2595             : 
    2596         408 :     state->memtupcount = 0;      /* make the heap empty */
    2597       37980 :     for (i = 0; i < tupcount; i++)
    2598             :     {
    2599       37572 :         if (state->memtupcount < state->bound)
    2600             :         {
    2601             :             /* Insert next tuple into heap */
    2602             :             /* Must copy source tuple to avoid possible overwrite */
    2603       18582 :             SortTuple   stup = state->memtuples[i];
    2604             : 
    2605       18582 :             tuplesort_heap_insert(state, &stup);
    2606             :         }
    2607             :         else
    2608             :         {
    2609             :             /*
    2610             :              * The heap is full.  Replace the largest entry with the new
    2611             :              * tuple, or just discard it, if it's larger than anything already
    2612             :              * in the heap.
    2613             :              */
    2614       18990 :             if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
    2615             :             {
    2616        9802 :                 free_sort_tuple(state, &state->memtuples[i]);
    2617        9802 :                 CHECK_FOR_INTERRUPTS();
    2618             :             }
    2619             :             else
    2620        9188 :                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
    2621             :         }
    2622             :     }
    2623             : 
    2624             :     Assert(state->memtupcount == state->bound);
    2625         408 :     state->status = TSS_BOUNDED;
    2626         408 : }
    2627             : 
    2628             : /*
    2629             :  * Convert the bounded heap to a properly-sorted array
    2630             :  */
    2631             : static void
    2632         408 : sort_bounded_heap(Tuplesortstate *state)
    2633             : {
    2634         408 :     int         tupcount = state->memtupcount;
    2635             : 
    2636             :     Assert(state->status == TSS_BOUNDED);
    2637             :     Assert(state->bounded);
    2638             :     Assert(tupcount == state->bound);
    2639             :     Assert(SERIAL(state));
    2640             : 
    2641             :     /*
    2642             :      * We can unheapify in place because each delete-top call will remove the
    2643             :      * largest entry, which we can promptly store in the newly freed slot at
    2644             :      * the end.  Once we're down to a single-entry heap, we're done.
    2645             :      */
    2646       18582 :     while (state->memtupcount > 1)
    2647             :     {
    2648       18174 :         SortTuple   stup = state->memtuples[0];
    2649             : 
    2650             :         /* this sifts-up the next-largest entry and decreases memtupcount */
    2651       18174 :         tuplesort_heap_delete_top(state);
    2652       18174 :         state->memtuples[state->memtupcount] = stup;
    2653             :     }
    2654         408 :     state->memtupcount = tupcount;
    2655             : 
    2656             :     /*
    2657             :      * Reverse sort direction back to the original state.  This is not
    2658             :      * actually necessary but seems like a good idea for tidiness.
    2659             :      */
    2660         408 :     reversedirection(state);
    2661             : 
    2662         408 :     state->status = TSS_SORTEDINMEM;
    2663         408 :     state->boundUsed = true;
    2664         408 : }
    2665             : 
    2666             : /*
    2667             :  * Sort all memtuples using specialized qsort() routines.
    2668             :  *
    2669             :  * Quicksort is used for small in-memory sorts, and external sort runs.
    2670             :  */
    2671             : static void
    2672      234462 : tuplesort_sort_memtuples(Tuplesortstate *state)
    2673             : {
    2674             :     Assert(!LEADER(state));
    2675             : 
    2676      234462 :     if (state->memtupcount > 1)
    2677             :     {
    2678             :         /*
    2679             :          * Do we have the leading column's value or abbreviation in datum1,
    2680             :          * and is there a specialization for its comparator?
    2681             :          */
    2682       67062 :         if (state->base.haveDatum1 && state->base.sortKeys)
    2683             :         {
    2684       67026 :             if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
    2685             :             {
    2686        3336 :                 qsort_tuple_unsigned(state->memtuples,
    2687        3336 :                                      state->memtupcount,
    2688             :                                      state);
    2689        3320 :                 return;
    2690             :             }
    2691       63690 :             else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
    2692             :             {
    2693        1312 :                 qsort_tuple_signed(state->memtuples,
    2694        1312 :                                    state->memtupcount,
    2695             :                                    state);
    2696        1312 :                 return;
    2697             :             }
    2698       62378 :             else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
    2699             :             {
    2700       39292 :                 qsort_tuple_int32(state->memtuples,
    2701       39292 :                                   state->memtupcount,
    2702             :                                   state);
    2703       39232 :                 return;
    2704             :             }
    2705             :         }
    2706             : 
    2707             :         /* Can we use the single-key sort function? */
    2708       23122 :         if (state->base.onlyKey != NULL)
    2709             :         {
    2710        9942 :             qsort_ssup(state->memtuples, state->memtupcount,
    2711        9942 :                        state->base.onlyKey);
    2712             :         }
    2713             :         else
    2714             :         {
    2715       13180 :             qsort_tuple(state->memtuples,
    2716       13180 :                         state->memtupcount,
    2717             :                         state->base.comparetup,
    2718             :                         state);
    2719             :         }
    2720             :     }
    2721             : }
    2722             : 
    2723             : /*
    2724             :  * Insert a new tuple into an empty or existing heap, maintaining the
    2725             :  * heap invariant.  Caller is responsible for ensuring there's room.
    2726             :  *
    2727             :  * Note: For some callers, tuple points to a memtuples[] entry above the
    2728             :  * end of the heap.  This is safe as long as it's not immediately adjacent
    2729             :  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
    2730             :  * is, it might get overwritten before being moved into the heap!
    2731             :  */
    2732             : static void
    2733       19830 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
    2734             : {
    2735             :     SortTuple  *memtuples;
    2736             :     int         j;
    2737             : 
    2738       19830 :     memtuples = state->memtuples;
    2739             :     Assert(state->memtupcount < state->memtupsize);
    2740             : 
    2741       19830 :     CHECK_FOR_INTERRUPTS();
    2742             : 
    2743             :     /*
    2744             :      * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
    2745             :      * using 1-based array indexes, not 0-based.
    2746             :      */
    2747       19830 :     j = state->memtupcount++;
    2748       57704 :     while (j > 0)
    2749             :     {
    2750       50916 :         int         i = (j - 1) >> 1;
    2751             : 
    2752       50916 :         if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
    2753       13042 :             break;
    2754       37874 :         memtuples[j] = memtuples[i];
    2755       37874 :         j = i;
    2756             :     }
    2757       19830 :     memtuples[j] = *tuple;
    2758       19830 : }
    2759             : 
    2760             : /*
    2761             :  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
    2762             :  * memtupcount, and sift up to maintain the heap invariant.
    2763             :  *
    2764             :  * The caller has already free'd the tuple the top node points to,
    2765             :  * if necessary.
    2766             :  */
    2767             : static void
    2768       19374 : tuplesort_heap_delete_top(Tuplesortstate *state)
    2769             : {
    2770       19374 :     SortTuple  *memtuples = state->memtuples;
    2771             :     SortTuple  *tuple;
    2772             : 
    2773       19374 :     if (--state->memtupcount <= 0)
    2774         284 :         return;
    2775             : 
    2776             :     /*
    2777             :      * Remove the last tuple in the heap, and re-insert it, by replacing the
    2778             :      * current top node with it.
    2779             :      */
    2780       19090 :     tuple = &memtuples[state->memtupcount];
    2781       19090 :     tuplesort_heap_replace_top(state, tuple);
    2782             : }
    2783             : 
    2784             : /*
    2785             :  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
    2786             :  * maintain the heap invariant.
    2787             :  *
    2788             :  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
    2789             :  * Heapsort, steps H3-H8).
    2790             :  */
    2791             : static void
    2792     5959832 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
    2793             : {
    2794     5959832 :     SortTuple  *memtuples = state->memtuples;
    2795             :     unsigned int i,
    2796             :                 n;
    2797             : 
    2798             :     Assert(state->memtupcount >= 1);
    2799             : 
    2800     5959832 :     CHECK_FOR_INTERRUPTS();
    2801             : 
    2802             :     /*
    2803             :      * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
    2804             :      * This prevents overflow in the "2 * i + 1" calculation, since at the top
    2805             :      * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
    2806             :      */
    2807     5959832 :     n = state->memtupcount;
    2808     5959832 :     i = 0;                      /* i is where the "hole" is */
    2809             :     for (;;)
    2810     1806798 :     {
    2811     7766630 :         unsigned int j = 2 * i + 1;
    2812             : 
    2813     7766630 :         if (j >= n)
    2814     1399062 :             break;
    2815     8800938 :         if (j + 1 < n &&
    2816     2433370 :             COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
    2817      960626 :             j++;
    2818     6367568 :         if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
    2819     4560770 :             break;
    2820     1806798 :         memtuples[i] = memtuples[j];
    2821     1806798 :         i = j;
    2822             :     }
    2823     5959832 :     memtuples[i] = *tuple;
    2824     5959832 : }
    2825             : 
    2826             : /*
    2827             :  * Function to reverse the sort direction from its current state
    2828             :  *
    2829             :  * It is not safe to call this when performing hash tuplesorts
    2830             :  */
    2831             : static void
    2832         816 : reversedirection(Tuplesortstate *state)
    2833             : {
    2834         816 :     SortSupport sortKey = state->base.sortKeys;
    2835             :     int         nkey;
    2836             : 
    2837        1992 :     for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
    2838             :     {
    2839        1176 :         sortKey->ssup_reverse = !sortKey->ssup_reverse;
    2840        1176 :         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    2841             :     }
    2842         816 : }
    2843             : 
    2844             : 
    2845             : /*
    2846             :  * Tape interface routines
    2847             :  */
    2848             : 
    2849             : static unsigned int
    2850     5704278 : getlen(LogicalTape *tape, bool eofOK)
    2851             : {
    2852             :     unsigned int len;
    2853             : 
    2854     5704278 :     if (LogicalTapeRead(tape,
    2855             :                         &len, sizeof(len)) != sizeof(len))
    2856           0 :         elog(ERROR, "unexpected end of tape");
    2857     5704278 :     if (len == 0 && !eofOK)
    2858           0 :         elog(ERROR, "unexpected end of data");
    2859     5704278 :     return len;
    2860             : }
    2861             : 
    2862             : static void
    2863        1642 : markrunend(LogicalTape *tape)
    2864             : {
    2865        1642 :     unsigned int len = 0;
    2866             : 
    2867        1642 :     LogicalTapeWrite(tape, &len, sizeof(len));
    2868        1642 : }
    2869             : 
    2870             : /*
    2871             :  * Get memory for tuple from within READTUP() routine.
    2872             :  *
    2873             :  * We use next free slot from the slab allocator, or palloc() if the tuple
    2874             :  * is too large for that.
    2875             :  */
    2876             : void *
    2877     5402560 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
    2878             : {
    2879             :     SlabSlot   *buf;
    2880             : 
    2881             :     /*
    2882             :      * We pre-allocate enough slots in the slab arena that we should never run
    2883             :      * out.
    2884             :      */
    2885             :     Assert(state->slabFreeHead);
    2886             : 
    2887     5402560 :     if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
    2888           0 :         return MemoryContextAlloc(state->base.sortcontext, tuplen);
    2889             :     else
    2890             :     {
    2891     5402560 :         buf = state->slabFreeHead;
    2892             :         /* Reuse this slot */
    2893     5402560 :         state->slabFreeHead = buf->nextfree;
    2894             : 
    2895     5402560 :         return buf;
    2896             :     }
    2897             : }
    2898             : 
    2899             : 
    2900             : /*
    2901             :  * Parallel sort routines
    2902             :  */
    2903             : 
    2904             : /*
    2905             :  * tuplesort_estimate_shared - estimate required shared memory allocation
    2906             :  *
    2907             :  * nWorkers is an estimate of the number of workers (it's the number that
    2908             :  * will be requested).
    2909             :  */
    2910             : Size
    2911         158 : tuplesort_estimate_shared(int nWorkers)
    2912             : {
    2913             :     Size        tapesSize;
    2914             : 
    2915             :     Assert(nWorkers > 0);
    2916             : 
    2917             :     /* Make sure that BufFile shared state is MAXALIGN'd */
    2918         158 :     tapesSize = mul_size(sizeof(TapeShare), nWorkers);
    2919         158 :     tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
    2920             : 
    2921         158 :     return tapesSize;
    2922             : }
    2923             : 
    2924             : /*
    2925             :  * tuplesort_initialize_shared - initialize shared tuplesort state
    2926             :  *
    2927             :  * Must be called from leader process before workers are launched, to
    2928             :  * establish state needed up-front for worker tuplesortstates.  nWorkers
    2929             :  * should match the argument passed to tuplesort_estimate_shared().
    2930             :  */
    2931             : void
    2932         222 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
    2933             : {
    2934             :     int         i;
    2935             : 
    2936             :     Assert(nWorkers > 0);
    2937             : 
    2938         222 :     SpinLockInit(&shared->mutex);
    2939         222 :     shared->currentWorker = 0;
    2940         222 :     shared->workersFinished = 0;
    2941         222 :     SharedFileSetInit(&shared->fileset, seg);
    2942         222 :     shared->nTapes = nWorkers;
    2943         674 :     for (i = 0; i < nWorkers; i++)
    2944             :     {
    2945         452 :         shared->tapes[i].firstblocknumber = 0L;
    2946             :     }
    2947         222 : }
    2948             : 
    2949             : /*
    2950             :  * tuplesort_attach_shared - attach to shared tuplesort state
    2951             :  *
    2952             :  * Must be called by all worker processes.
    2953             :  */
    2954             : void
    2955         224 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
    2956             : {
    2957             :     /* Attach to SharedFileSet */
    2958         224 :     SharedFileSetAttach(&shared->fileset, seg);
    2959         224 : }
    2960             : 
    2961             : /*
    2962             :  * worker_get_identifier - Assign and return ordinal identifier for worker
    2963             :  *
    2964             :  * The order in which these are assigned is not well defined, and should not
    2965             :  * matter; worker numbers across parallel sort participants need only be
    2966             :  * distinct and gapless.  logtape.c requires this.
    2967             :  *
    2968             :  * Note that the identifiers assigned from here have no relation to
    2969             :  * ParallelWorkerNumber number, to avoid making any assumption about
    2970             :  * caller's requirements.  However, we do follow the ParallelWorkerNumber
    2971             :  * convention of representing a non-worker with worker number -1.  This
    2972             :  * includes the leader, as well as serial Tuplesort processes.
    2973             :  */
    2974             : static int
    2975         444 : worker_get_identifier(Tuplesortstate *state)
    2976             : {
    2977         444 :     Sharedsort *shared = state->shared;
    2978             :     int         worker;
    2979             : 
    2980             :     Assert(WORKER(state));
    2981             : 
    2982         444 :     SpinLockAcquire(&shared->mutex);
    2983         444 :     worker = shared->currentWorker++;
    2984         444 :     SpinLockRelease(&shared->mutex);
    2985             : 
    2986         444 :     return worker;
    2987             : }
    2988             : 
    2989             : /*
    2990             :  * worker_freeze_result_tape - freeze worker's result tape for leader
    2991             :  *
    2992             :  * This is called by workers just after the result tape has been determined,
    2993             :  * instead of calling LogicalTapeFreeze() directly.  They do so because
    2994             :  * workers require a few additional steps over similar serial
    2995             :  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
    2996             :  * steps are around freeing now unneeded resources, and representing to
    2997             :  * leader that worker's input run is available for its merge.
    2998             :  *
    2999             :  * There should only be one final output run for each worker, which consists
    3000             :  * of all tuples that were originally input into worker.
    3001             :  */
    3002             : static void
    3003         444 : worker_freeze_result_tape(Tuplesortstate *state)
    3004             : {
    3005         444 :     Sharedsort *shared = state->shared;
    3006             :     TapeShare   output;
    3007             : 
    3008             :     Assert(WORKER(state));
    3009             :     Assert(state->result_tape != NULL);
    3010             :     Assert(state->memtupcount == 0);
    3011             : 
    3012             :     /*
    3013             :      * Free most remaining memory, in case caller is sensitive to our holding
    3014             :      * on to it.  memtuples may not be a tiny merge heap at this point.
    3015             :      */
    3016         444 :     pfree(state->memtuples);
    3017             :     /* Be tidy */
    3018         444 :     state->memtuples = NULL;
    3019         444 :     state->memtupsize = 0;
    3020             : 
    3021             :     /*
    3022             :      * Parallel worker requires result tape metadata, which is to be stored in
    3023             :      * shared memory for leader
    3024             :      */
    3025         444 :     LogicalTapeFreeze(state->result_tape, &output);
    3026             : 
    3027             :     /* Store properties of output tape, and update finished worker count */
    3028         444 :     SpinLockAcquire(&shared->mutex);
    3029         444 :     shared->tapes[state->worker] = output;
    3030         444 :     shared->workersFinished++;
    3031         444 :     SpinLockRelease(&shared->mutex);
    3032         444 : }
    3033             : 
    3034             : /*
    3035             :  * worker_nomergeruns - dump memtuples in worker, without merging
    3036             :  *
    3037             :  * This called as an alternative to mergeruns() with a worker when no
    3038             :  * merging is required.
    3039             :  */
    3040             : static void
    3041         444 : worker_nomergeruns(Tuplesortstate *state)
    3042             : {
    3043             :     Assert(WORKER(state));
    3044             :     Assert(state->result_tape == NULL);
    3045             :     Assert(state->nOutputRuns == 1);
    3046             : 
    3047         444 :     state->result_tape = state->destTape;
    3048         444 :     worker_freeze_result_tape(state);
    3049         444 : }
    3050             : 
    3051             : /*
    3052             :  * leader_takeover_tapes - create tapeset for leader from worker tapes
    3053             :  *
    3054             :  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
    3055             :  * sorting has occurred in workers, all of which must have already returned
    3056             :  * from tuplesort_performsort().
    3057             :  *
    3058             :  * When this returns, leader process is left in a state that is virtually
    3059             :  * indistinguishable from it having generated runs as a serial external sort
    3060             :  * might have.
    3061             :  */
    3062             : static void
    3063         156 : leader_takeover_tapes(Tuplesortstate *state)
    3064             : {
    3065         156 :     Sharedsort *shared = state->shared;
    3066         156 :     int         nParticipants = state->nParticipants;
    3067             :     int         workersFinished;
    3068             :     int         j;
    3069             : 
    3070             :     Assert(LEADER(state));
    3071             :     Assert(nParticipants >= 1);
    3072             : 
    3073         156 :     SpinLockAcquire(&shared->mutex);
    3074         156 :     workersFinished = shared->workersFinished;
    3075         156 :     SpinLockRelease(&shared->mutex);
    3076             : 
    3077         156 :     if (nParticipants != workersFinished)
    3078           0 :         elog(ERROR, "cannot take over tapes before all workers finish");
    3079             : 
    3080             :     /*
    3081             :      * Create the tapeset from worker tapes, including a leader-owned tape at
    3082             :      * the end.  Parallel workers are far more expensive than logical tapes,
    3083             :      * so the number of tapes allocated here should never be excessive.
    3084             :      */
    3085         156 :     inittapestate(state, nParticipants);
    3086         156 :     state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
    3087             : 
    3088             :     /*
    3089             :      * Set currentRun to reflect the number of runs we will merge (it's not
    3090             :      * used for anything, this is just pro forma)
    3091             :      */
    3092         156 :     state->currentRun = nParticipants;
    3093             : 
    3094             :     /*
    3095             :      * Initialize the state to look the same as after building the initial
    3096             :      * runs.
    3097             :      *
    3098             :      * There will always be exactly 1 run per worker, and exactly one input
    3099             :      * tape per run, because workers always output exactly 1 run, even when
    3100             :      * there were no input tuples for workers to sort.
    3101             :      */
    3102         156 :     state->inputTapes = NULL;
    3103         156 :     state->nInputTapes = 0;
    3104         156 :     state->nInputRuns = 0;
    3105             : 
    3106         156 :     state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
    3107         156 :     state->nOutputTapes = nParticipants;
    3108         156 :     state->nOutputRuns = nParticipants;
    3109             : 
    3110         472 :     for (j = 0; j < nParticipants; j++)
    3111             :     {
    3112         316 :         state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    3113             :     }
    3114             : 
    3115         156 :     state->status = TSS_BUILDRUNS;
    3116         156 : }
    3117             : 
    3118             : /*
    3119             :  * Convenience routine to free a tuple previously loaded into sort memory
    3120             :  */
    3121             : static void
    3122     3759982 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
    3123             : {
    3124     3759982 :     if (stup->tuple)
    3125             :     {
    3126     3599704 :         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    3127     3599704 :         pfree(stup->tuple);
    3128     3599704 :         stup->tuple = NULL;
    3129             :     }
    3130     3759982 : }
    3131             : 
    3132             : int
    3133           0 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
    3134             : {
    3135           0 :     if (x < y)
    3136           0 :         return -1;
    3137           0 :     else if (x > y)
    3138           0 :         return 1;
    3139             :     else
    3140           0 :         return 0;
    3141             : }
    3142             : 
    3143             : int
    3144     1139832 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
    3145             : {
    3146     1139832 :     int64       xx = DatumGetInt64(x);
    3147     1139832 :     int64       yy = DatumGetInt64(y);
    3148             : 
    3149     1139832 :     if (xx < yy)
    3150      426530 :         return -1;
    3151      713302 :     else if (xx > yy)
    3152      358468 :         return 1;
    3153             :     else
    3154      354834 :         return 0;
    3155             : }
    3156             : 
    3157             : int
    3158   180533632 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
    3159             : {
    3160   180533632 :     int32       xx = DatumGetInt32(x);
    3161   180533632 :     int32       yy = DatumGetInt32(y);
    3162             : 
    3163   180533632 :     if (xx < yy)
    3164    44679588 :         return -1;
    3165   135854044 :     else if (xx > yy)
    3166    41681734 :         return 1;
    3167             :     else
    3168    94172310 :         return 0;
    3169             : }

Generated by: LCOV version 1.16