LCOV - code coverage report
Current view: top level - src/backend/utils/sort - tuplesort.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 728 812 89.7 %
Date: 2026-02-02 14:17:46 Functions: 54 55 98.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * tuplesort.c
       4             :  *    Generalized tuple sorting routines.
       5             :  *
       6             :  * This module provides a generalized facility for tuple sorting, which can be
       7             :  * applied to different kinds of sortable objects.  Implementation of
       8             :  * the particular sorting variants is given in tuplesortvariants.c.
       9             :  * This module works efficiently for both small and large amounts
      10             :  * of data.  Small amounts are sorted in-memory using qsort().  Large
      11             :  * amounts are sorted using temporary files and a standard external sort
      12             :  * algorithm.
      13             :  *
      14             :  * See Knuth, volume 3, for more than you want to know about external
      15             :  * sorting algorithms.  The algorithm we use is a balanced k-way merge.
      16             :  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
      17             :  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
      18             :  * merge is better.  Knuth is assuming that tape drives are expensive
      19             :  * beasts, and in particular that there will always be many more runs than
      20             :  * tape drives.  The polyphase merge algorithm was good at keeping all the
      21             :  * tape drives busy, but in our implementation a "tape drive" doesn't cost
      22             :  * much more than a few Kb of memory buffers, so we can afford to have
      23             :  * lots of them.  In particular, if we can have as many tape drives as
      24             :  * sorted runs, we can eliminate any repeated I/O at all.
      25             :  *
      26             :  * Historically, we divided the input into sorted runs using replacement
      27             :  * selection, in the form of a priority tree implemented as a heap
      28             :  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
      29             :  * for run generation.
      30             :  *
      31             :  * The approximate amount of memory allowed for any one sort operation
      32             :  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
      33             :  * we absorb tuples and simply store them in an unsorted array as long as
      34             :  * we haven't exceeded workMem.  If we reach the end of the input without
      35             :  * exceeding workMem, we sort the array using qsort() and subsequently return
      36             :  * tuples just by scanning the tuple array sequentially.  If we do exceed
      37             :  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
      38             :  * When tuples are dumped in batch after quicksorting, we begin a new run
      39             :  * with a new output tape.  If we reach the max number of tapes, we write
      40             :  * subsequent runs on the existing tapes in a round-robin fashion.  We will
      41             :  * need multiple merge passes to finish the merge in that case.  After the
      42             :  * end of the input is reached, we dump out remaining tuples in memory into
      43             :  * a final run, then merge the runs.
      44             :  *
      45             :  * When merging runs, we use a heap containing just the frontmost tuple from
      46             :  * each source run; we repeatedly output the smallest tuple and replace it
      47             :  * with the next tuple from its source tape (if any).  When the heap empties,
      48             :  * the merge is complete.  The basic merge algorithm thus needs very little
      49             :  * memory --- only M tuples for an M-way merge, and M is constrained to a
      50             :  * small number.  However, we can still make good use of our full workMem
      51             :  * allocation by pre-reading additional blocks from each source tape.  Without
      52             :  * prereading, our access pattern to the temporary file would be very erratic;
      53             :  * on average we'd read one block from each of M source tapes during the same
      54             :  * time that we're writing M blocks to the output tape, so there is no
      55             :  * sequentiality of access at all, defeating the read-ahead methods used by
      56             :  * most Unix kernels.  Worse, the output tape gets written into a very random
      57             :  * sequence of blocks of the temp file, ensuring that things will be even
      58             :  * worse when it comes time to read that tape.  A straightforward merge pass
      59             :  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
      60             :  * by prereading from each source tape sequentially, loading about workMem/M
      61             :  * bytes from each tape in turn, and making the sequential blocks immediately
      62             :  * available for reuse.  This approach helps to localize both read and write
      63             :  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
      64             :  * much memory to use for the buffers.
      65             :  *
      66             :  * In the current code we determine the number of input tapes M on the basis
      67             :  * of workMem: we want workMem/M to be large enough that we read a fair
      68             :  * amount of data each time we read from a tape, so as to maintain the
      69             :  * locality of access described above.  Nonetheless, with large workMem we
      70             :  * can have many tapes.  The logical "tapes" are implemented by logtape.c,
      71             :  * which avoids space wastage by recycling disk space as soon as each block
      72             :  * is read from its "tape".
      73             :  *
      74             :  * When the caller requests random access to the sort result, we form
      75             :  * the final sorted run on a logical tape which is then "frozen", so
      76             :  * that we can access it randomly.  When the caller does not need random
      77             :  * access, we return from tuplesort_performsort() as soon as we are down
      78             :  * to one run per logical tape.  The final merge is then performed
      79             :  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
      80             :  * saves one cycle of writing all the data out to disk and reading it in.
      81             :  *
      82             :  * This module supports parallel sorting.  Parallel sorts involve coordination
      83             :  * among one or more worker processes, and a leader process, each with its own
      84             :  * tuplesort state.  The leader process (or, more accurately, the
      85             :  * Tuplesortstate associated with a leader process) creates a full tapeset
      86             :  * consisting of worker tapes with one run to merge; a run for every
      87             :  * worker process.  This is then merged.  Worker processes are guaranteed to
      88             :  * produce exactly one output run from their partial input.
      89             :  *
      90             :  *
      91             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      92             :  * Portions Copyright (c) 1994, Regents of the University of California
      93             :  *
      94             :  * IDENTIFICATION
      95             :  *    src/backend/utils/sort/tuplesort.c
      96             :  *
      97             :  *-------------------------------------------------------------------------
      98             :  */
      99             : 
     100             : #include "postgres.h"
     101             : 
     102             : #include <limits.h>
     103             : 
     104             : #include "commands/tablespace.h"
     105             : #include "miscadmin.h"
     106             : #include "pg_trace.h"
     107             : #include "storage/shmem.h"
     108             : #include "utils/guc.h"
     109             : #include "utils/memutils.h"
     110             : #include "utils/pg_rusage.h"
     111             : #include "utils/tuplesort.h"
     112             : 
     113             : /*
     114             :  * Initial size of memtuples array.  This must be more than
     115             :  * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples().  Clamp at
     116             :  * 1024 elements to avoid excessive reallocs.
     117             :  */
     118             : #define INITIAL_MEMTUPSIZE Max(1024, \
     119             :     ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
     120             : 
     121             : /* GUC variables */
     122             : bool        trace_sort = false;
     123             : 
     124             : #ifdef DEBUG_BOUNDED_SORT
     125             : bool        optimize_bounded_sort = true;
     126             : #endif
     127             : 
     128             : 
     129             : /*
     130             :  * During merge, we use a pre-allocated set of fixed-size slots to hold
     131             :  * tuples.  To avoid palloc/pfree overhead.
     132             :  *
     133             :  * Merge doesn't require a lot of memory, so we can afford to waste some,
     134             :  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
     135             :  * palloc() overhead is not significant anymore.
     136             :  *
     137             :  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
     138             :  * slot holds a tuple.
     139             :  */
     140             : #define SLAB_SLOT_SIZE 1024
     141             : 
     142             : typedef union SlabSlot
     143             : {
     144             :     union SlabSlot *nextfree;
     145             :     char        buffer[SLAB_SLOT_SIZE];
     146             : } SlabSlot;
     147             : 
     148             : /*
     149             :  * Possible states of a Tuplesort object.  These denote the states that
     150             :  * persist between calls of Tuplesort routines.
     151             :  */
     152             : typedef enum
     153             : {
     154             :     TSS_INITIAL,                /* Loading tuples; still within memory limit */
     155             :     TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
     156             :     TSS_BUILDRUNS,              /* Loading tuples; writing to tape */
     157             :     TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
     158             :     TSS_SORTEDONTAPE,           /* Sort completed, final run is on tape */
     159             :     TSS_FINALMERGE,             /* Performing final merge on-the-fly */
     160             : } TupSortStatus;
     161             : 
     162             : /*
     163             :  * Parameters for calculation of number of tapes to use --- see inittapes()
     164             :  * and tuplesort_merge_order().
     165             :  *
     166             :  * In this calculation we assume that each tape will cost us about 1 blocks
     167             :  * worth of buffer space.  This ignores the overhead of all the other data
     168             :  * structures needed for each tape, but it's probably close enough.
     169             :  *
     170             :  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
     171             :  * input tape, for pre-reading (see discussion at top of file).  This is *in
     172             :  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
     173             :  */
     174             : #define MINORDER        6       /* minimum merge order */
     175             : #define MAXORDER        500     /* maximum merge order */
     176             : #define TAPE_BUFFER_OVERHEAD        BLCKSZ
     177             : #define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
     178             : 
     179             : 
     180             : /*
     181             :  * Private state of a Tuplesort operation.
     182             :  */
     183             : struct Tuplesortstate
     184             : {
     185             :     TuplesortPublic base;
     186             :     TupSortStatus status;       /* enumerated value as shown above */
     187             :     bool        bounded;        /* did caller specify a maximum number of
     188             :                                  * tuples to return? */
     189             :     bool        boundUsed;      /* true if we made use of a bounded heap */
     190             :     int         bound;          /* if bounded, the maximum number of tuples */
     191             :     int64       tupleMem;       /* memory consumed by individual tuples.
     192             :                                  * storing this separately from what we track
     193             :                                  * in availMem allows us to subtract the
     194             :                                  * memory consumed by all tuples when dumping
     195             :                                  * tuples to tape */
     196             :     int64       availMem;       /* remaining memory available, in bytes */
     197             :     int64       allowedMem;     /* total memory allowed, in bytes */
     198             :     int         maxTapes;       /* max number of input tapes to merge in each
     199             :                                  * pass */
     200             :     int64       maxSpace;       /* maximum amount of space occupied among sort
     201             :                                  * of groups, either in-memory or on-disk */
     202             :     bool        isMaxSpaceDisk; /* true when maxSpace tracks on-disk space,
     203             :                                  * false means in-memory */
     204             :     TupSortStatus maxSpaceStatus;   /* sort status when maxSpace was reached */
     205             :     LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
     206             : 
     207             :     /*
     208             :      * This array holds the tuples now in sort memory.  If we are in state
     209             :      * INITIAL, the tuples are in no particular order; if we are in state
     210             :      * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     211             :      * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     212             :      * H.  In state SORTEDONTAPE, the array is not used.
     213             :      */
     214             :     SortTuple  *memtuples;      /* array of SortTuple structs */
     215             :     int         memtupcount;    /* number of tuples currently present */
     216             :     int         memtupsize;     /* allocated length of memtuples array */
     217             :     bool        growmemtuples;  /* memtuples' growth still underway? */
     218             : 
     219             :     /*
     220             :      * Memory for tuples is sometimes allocated using a simple slab allocator,
     221             :      * rather than with palloc().  Currently, we switch to slab allocation
     222             :      * when we start merging.  Merging only needs to keep a small, fixed
     223             :      * number of tuples in memory at any time, so we can avoid the
     224             :      * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     225             :      * to hold the tuples.
     226             :      *
     227             :      * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     228             :      * slots.  The allocation is sized to have one slot per tape, plus one
     229             :      * additional slot.  We need that many slots to hold all the tuples kept
     230             :      * in the heap during merge, plus the one we have last returned from the
     231             :      * sort, with tuplesort_gettuple.
     232             :      *
     233             :      * Initially, all the slots are kept in a linked list of free slots.  When
     234             :      * a tuple is read from a tape, it is put to the next available slot, if
     235             :      * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     236             :      * instead.
     237             :      *
     238             :      * When we're done processing a tuple, we return the slot back to the free
     239             :      * list, or pfree() if it was palloc'd.  We know that a tuple was
     240             :      * allocated from the slab, if its pointer value is between
     241             :      * slabMemoryBegin and -End.
     242             :      *
     243             :      * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     244             :      * tracking memory usage is not used.
     245             :      */
     246             :     bool        slabAllocatorUsed;
     247             : 
     248             :     char       *slabMemoryBegin;    /* beginning of slab memory arena */
     249             :     char       *slabMemoryEnd;  /* end of slab memory arena */
     250             :     SlabSlot   *slabFreeHead;   /* head of free list */
     251             : 
     252             :     /* Memory used for input and output tape buffers. */
     253             :     size_t      tape_buffer_mem;
     254             : 
     255             :     /*
     256             :      * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     257             :      * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     258             :      * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     259             :      * recycle the memory on next gettuple call.
     260             :      */
     261             :     void       *lastReturnedTuple;
     262             : 
     263             :     /*
     264             :      * While building initial runs, this is the current output run number.
     265             :      * Afterwards, it is the number of initial runs we made.
     266             :      */
     267             :     int         currentRun;
     268             : 
     269             :     /*
     270             :      * Logical tapes, for merging.
     271             :      *
     272             :      * The initial runs are written in the output tapes.  In each merge pass,
     273             :      * the output tapes of the previous pass become the input tapes, and new
     274             :      * output tapes are created as needed.  When nInputTapes equals
     275             :      * nInputRuns, there is only one merge pass left.
     276             :      */
     277             :     LogicalTape **inputTapes;
     278             :     int         nInputTapes;
     279             :     int         nInputRuns;
     280             : 
     281             :     LogicalTape **outputTapes;
     282             :     int         nOutputTapes;
     283             :     int         nOutputRuns;
     284             : 
     285             :     LogicalTape *destTape;      /* current output tape */
     286             : 
     287             :     /*
     288             :      * These variables are used after completion of sorting to keep track of
     289             :      * the next tuple to return.  (In the tape case, the tape's current read
     290             :      * position is also critical state.)
     291             :      */
     292             :     LogicalTape *result_tape;   /* actual tape of finished output */
     293             :     int         current;        /* array index (only used if SORTEDINMEM) */
     294             :     bool        eof_reached;    /* reached EOF (needed for cursors) */
     295             : 
     296             :     /* markpos_xxx holds marked position for mark and restore */
     297             :     int64       markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
     298             :     int         markpos_offset; /* saved "current", or offset in tape block */
     299             :     bool        markpos_eof;    /* saved "eof_reached" */
     300             : 
     301             :     /*
     302             :      * These variables are used during parallel sorting.
     303             :      *
     304             :      * worker is our worker identifier.  Follows the general convention that
     305             :      * -1 value relates to a leader tuplesort, and values >= 0 worker
     306             :      * tuplesorts. (-1 can also be a serial tuplesort.)
     307             :      *
     308             :      * shared is mutable shared memory state, which is used to coordinate
     309             :      * parallel sorts.
     310             :      *
     311             :      * nParticipants is the number of worker Tuplesortstates known by the
     312             :      * leader to have actually been launched, which implies that they must
     313             :      * finish a run that the leader needs to merge.  Typically includes a
     314             :      * worker state held by the leader process itself.  Set in the leader
     315             :      * Tuplesortstate only.
     316             :      */
     317             :     int         worker;
     318             :     Sharedsort *shared;
     319             :     int         nParticipants;
     320             : 
     321             :     /*
     322             :      * Additional state for managing "abbreviated key" sortsupport routines
     323             :      * (which currently may be used by all cases except the hash index case).
     324             :      * Tracks the intervals at which the optimization's effectiveness is
     325             :      * tested.
     326             :      */
     327             :     int64       abbrevNext;     /* Tuple # at which to next check
     328             :                                  * applicability */
     329             : 
     330             :     /*
     331             :      * Resource snapshot for time of sort start.
     332             :      */
     333             :     PGRUsage    ru_start;
     334             : };
     335             : 
     336             : /*
     337             :  * Private mutable state of tuplesort-parallel-operation.  This is allocated
     338             :  * in shared memory.
     339             :  */
     340             : struct Sharedsort
     341             : {
     342             :     /* mutex protects all fields prior to tapes */
     343             :     slock_t     mutex;
     344             : 
     345             :     /*
     346             :      * currentWorker generates ordinal identifier numbers for parallel sort
     347             :      * workers.  These start from 0, and are always gapless.
     348             :      *
     349             :      * Workers increment workersFinished to indicate having finished.  If this
     350             :      * is equal to state.nParticipants within the leader, leader is ready to
     351             :      * merge worker runs.
     352             :      */
     353             :     int         currentWorker;
     354             :     int         workersFinished;
     355             : 
     356             :     /* Temporary file space */
     357             :     SharedFileSet fileset;
     358             : 
     359             :     /* Size of tapes flexible array */
     360             :     int         nTapes;
     361             : 
     362             :     /*
     363             :      * Tapes array used by workers to report back information needed by the
     364             :      * leader to concatenate all worker tapes into one for merging
     365             :      */
     366             :     TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
     367             : };
     368             : 
     369             : /*
     370             :  * Is the given tuple allocated from the slab memory arena?
     371             :  */
     372             : #define IS_SLAB_SLOT(state, tuple) \
     373             :     ((char *) (tuple) >= (state)->slabMemoryBegin && \
     374             :      (char *) (tuple) < (state)->slabMemoryEnd)
     375             : 
     376             : /*
     377             :  * Return the given tuple to the slab memory free list, or free it
     378             :  * if it was palloc'd.
     379             :  */
     380             : #define RELEASE_SLAB_SLOT(state, tuple) \
     381             :     do { \
     382             :         SlabSlot *buf = (SlabSlot *) tuple; \
     383             :         \
     384             :         if (IS_SLAB_SLOT((state), buf)) \
     385             :         { \
     386             :             buf->nextfree = (state)->slabFreeHead; \
     387             :             (state)->slabFreeHead = buf; \
     388             :         } else \
     389             :             pfree(buf); \
     390             :     } while(0)
     391             : 
     392             : #define REMOVEABBREV(state,stup,count)  ((*(state)->base.removeabbrev) (state, stup, count))
     393             : #define COMPARETUP(state,a,b)   ((*(state)->base.comparetup) (a, b, state))
     394             : #define WRITETUP(state,tape,stup)   ((*(state)->base.writetup) (state, tape, stup))
     395             : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
     396             : #define FREESTATE(state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
     397             : #define LACKMEM(state)      ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
     398             : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     399             : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     400             : #define SERIAL(state)       ((state)->shared == NULL)
     401             : #define WORKER(state)       ((state)->shared && (state)->worker != -1)
     402             : #define LEADER(state)       ((state)->shared && (state)->worker == -1)
     403             : 
     404             : /*
     405             :  * NOTES about on-tape representation of tuples:
     406             :  *
     407             :  * We require the first "unsigned int" of a stored tuple to be the total size
     408             :  * on-tape of the tuple, including itself (so it is never zero; an all-zero
     409             :  * unsigned int is used to delimit runs).  The remainder of the stored tuple
     410             :  * may or may not match the in-memory representation of the tuple ---
     411             :  * any conversion needed is the job of the writetup and readtup routines.
     412             :  *
     413             :  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
     414             :  * representation of the tuple must be followed by another "unsigned int" that
     415             :  * is a copy of the length --- so the total tape space used is actually
     416             :  * sizeof(unsigned int) more than the stored length value.  This allows
     417             :  * read-backwards.  When the random access flag was not specified, the
     418             :  * write/read routines may omit the extra length word.
     419             :  *
     420             :  * writetup is expected to write both length words as well as the tuple
     421             :  * data.  When readtup is called, the tape is positioned just after the
     422             :  * front length word; readtup must read the tuple data and advance past
     423             :  * the back length word (if present).
     424             :  *
     425             :  * The write/read routines can make use of the tuple description data
     426             :  * stored in the Tuplesortstate record, if needed.  They are also expected
     427             :  * to adjust state->availMem by the amount of memory space (not tape space!)
     428             :  * released or consumed.  There is no error return from either writetup
     429             :  * or readtup; they should ereport() on failure.
     430             :  *
     431             :  *
     432             :  * NOTES about memory consumption calculations:
     433             :  *
     434             :  * We count space allocated for tuples against the workMem limit, plus
     435             :  * the space used by the variable-size memtuples array.  Fixed-size space
     436             :  * is not counted; it's small enough to not be interesting.
     437             :  *
     438             :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     439             :  * rather than the originally-requested size.  This is important since
     440             :  * palloc can add substantial overhead.  It's not a complete answer since
     441             :  * we won't count any wasted space in palloc allocation blocks, but it's
     442             :  * a lot better than what we were doing before 7.3.  As of 9.6, a
     443             :  * separate memory context is used for caller passed tuples.  Resetting
     444             :  * it at certain key increments significantly ameliorates fragmentation.
     445             :  * readtup routines use the slab allocator (they cannot use
     446             :  * the reset context because it gets deleted at the point that merging
     447             :  * begins).
     448             :  */
     449             : 
     450             : 
     451             : static void tuplesort_begin_batch(Tuplesortstate *state);
     452             : static bool consider_abort_common(Tuplesortstate *state);
     453             : static void inittapes(Tuplesortstate *state, bool mergeruns);
     454             : static void inittapestate(Tuplesortstate *state, int maxTapes);
     455             : static void selectnewtape(Tuplesortstate *state);
     456             : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
     457             : static void mergeruns(Tuplesortstate *state);
     458             : static void mergeonerun(Tuplesortstate *state);
     459             : static void beginmerge(Tuplesortstate *state);
     460             : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
     461             : static void dumptuples(Tuplesortstate *state, bool alltuples);
     462             : static void make_bounded_heap(Tuplesortstate *state);
     463             : static void sort_bounded_heap(Tuplesortstate *state);
     464             : static void tuplesort_sort_memtuples(Tuplesortstate *state);
     465             : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
     466             : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
     467             : static void tuplesort_heap_delete_top(Tuplesortstate *state);
     468             : static void reversedirection(Tuplesortstate *state);
     469             : static unsigned int getlen(LogicalTape *tape, bool eofOK);
     470             : static void markrunend(LogicalTape *tape);
     471             : static int  worker_get_identifier(Tuplesortstate *state);
     472             : static void worker_freeze_result_tape(Tuplesortstate *state);
     473             : static void worker_nomergeruns(Tuplesortstate *state);
     474             : static void leader_takeover_tapes(Tuplesortstate *state);
     475             : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
     476             : static void tuplesort_free(Tuplesortstate *state);
     477             : static void tuplesort_updatemax(Tuplesortstate *state);
     478             : 
     479             : /*
     480             :  * Specialized comparators that we can inline into specialized sorts.  The goal
     481             :  * is to try to sort two tuples without having to follow the pointers to the
     482             :  * comparator or the tuple.
     483             :  *
     484             :  * XXX: For now, there is no specialization for cases where datum1 is
     485             :  * authoritative and we don't even need to fall back to a callback at all (that
     486             :  * would be true for types like int4/int8/timestamp/date, but not true for
     487             :  * abbreviations of text or multi-key sorts.  There could be!  Is it worth it?
     488             :  */
     489             : 
     490             : /* Used if first key's comparator is ssup_datum_unsigned_cmp */
     491             : static pg_attribute_always_inline int
     492    46637480 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     493             : {
     494             :     int         compare;
     495             : 
     496    46637480 :     compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
     497    46637480 :                                           b->datum1, b->isnull1,
     498             :                                           &state->base.sortKeys[0]);
     499    46637480 :     if (compare != 0)
     500    42070550 :         return compare;
     501             : 
     502             :     /*
     503             :      * No need to waste effort calling the tiebreak function when there are no
     504             :      * other keys to sort on.
     505             :      */
     506     4566930 :     if (state->base.onlyKey != NULL)
     507           0 :         return 0;
     508             : 
     509     4566930 :     return state->base.comparetup_tiebreak(a, b, state);
     510             : }
     511             : 
     512             : /* Used if first key's comparator is ssup_datum_signed_cmp */
     513             : static pg_attribute_always_inline int
     514     9710082 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     515             : {
     516             :     int         compare;
     517             : 
     518     9710082 :     compare = ApplySignedSortComparator(a->datum1, a->isnull1,
     519     9710082 :                                         b->datum1, b->isnull1,
     520             :                                         &state->base.sortKeys[0]);
     521             : 
     522     9710082 :     if (compare != 0)
     523     9600384 :         return compare;
     524             : 
     525             :     /*
     526             :      * No need to waste effort calling the tiebreak function when there are no
     527             :      * other keys to sort on.
     528             :      */
     529      109698 :     if (state->base.onlyKey != NULL)
     530       95264 :         return 0;
     531             : 
     532       14434 :     return state->base.comparetup_tiebreak(a, b, state);
     533             : }
     534             : 
     535             : /* Used if first key's comparator is ssup_datum_int32_cmp */
     536             : static pg_attribute_always_inline int
     537    56096930 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     538             : {
     539             :     int         compare;
     540             : 
     541    56096930 :     compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
     542    56096930 :                                        b->datum1, b->isnull1,
     543             :                                        &state->base.sortKeys[0]);
     544             : 
     545    56096930 :     if (compare != 0)
     546    39971326 :         return compare;
     547             : 
     548             :     /*
     549             :      * No need to waste effort calling the tiebreak function when there are no
     550             :      * other keys to sort on.
     551             :      */
     552    16125604 :     if (state->base.onlyKey != NULL)
     553     2335442 :         return 0;
     554             : 
     555    13790162 :     return state->base.comparetup_tiebreak(a, b, state);
     556             : }
     557             : 
     558             : /*
     559             :  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
     560             :  * any variant of SortTuples, using the appropriate comparetup function.
     561             :  * qsort_ssup() is specialized for the case where the comparetup function
     562             :  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
     563             :  * and Datum sorts.  qsort_tuple_{unsigned,signed,int32} are specialized for
     564             :  * common comparison functions on pass-by-value leading datums.
     565             :  */
     566             : 
     567             : #define ST_SORT qsort_tuple_unsigned
     568             : #define ST_ELEMENT_TYPE SortTuple
     569             : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
     570             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     571             : #define ST_CHECK_FOR_INTERRUPTS
     572             : #define ST_SCOPE static
     573             : #define ST_DEFINE
     574             : #include "lib/sort_template.h"
     575             : 
     576             : #define ST_SORT qsort_tuple_signed
     577             : #define ST_ELEMENT_TYPE SortTuple
     578             : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
     579             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     580             : #define ST_CHECK_FOR_INTERRUPTS
     581             : #define ST_SCOPE static
     582             : #define ST_DEFINE
     583             : #include "lib/sort_template.h"
     584             : 
     585             : #define ST_SORT qsort_tuple_int32
     586             : #define ST_ELEMENT_TYPE SortTuple
     587             : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
     588             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     589             : #define ST_CHECK_FOR_INTERRUPTS
     590             : #define ST_SCOPE static
     591             : #define ST_DEFINE
     592             : #include "lib/sort_template.h"
     593             : 
     594             : #define ST_SORT qsort_tuple
     595             : #define ST_ELEMENT_TYPE SortTuple
     596             : #define ST_COMPARE_RUNTIME_POINTER
     597             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     598             : #define ST_CHECK_FOR_INTERRUPTS
     599             : #define ST_SCOPE static
     600             : #define ST_DECLARE
     601             : #define ST_DEFINE
     602             : #include "lib/sort_template.h"
     603             : 
     604             : #define ST_SORT qsort_ssup
     605             : #define ST_ELEMENT_TYPE SortTuple
     606             : #define ST_COMPARE(a, b, ssup) \
     607             :     ApplySortComparator((a)->datum1, (a)->isnull1, \
     608             :                         (b)->datum1, (b)->isnull1, (ssup))
     609             : #define ST_COMPARE_ARG_TYPE SortSupportData
     610             : #define ST_CHECK_FOR_INTERRUPTS
     611             : #define ST_SCOPE static
     612             : #define ST_DEFINE
     613             : #include "lib/sort_template.h"
     614             : 
     615             : /*
     616             :  *      tuplesort_begin_xxx
     617             :  *
     618             :  * Initialize for a tuple sort operation.
     619             :  *
     620             :  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
     621             :  * zero or more times, then call tuplesort_performsort when all the tuples
     622             :  * have been supplied.  After performsort, retrieve the tuples in sorted
     623             :  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
     624             :  * access was requested, rescan, markpos, and restorepos can also be called.)
     625             :  * Call tuplesort_end to terminate the operation and release memory/disk space.
     626             :  *
     627             :  * Each variant of tuplesort_begin has a workMem parameter specifying the
     628             :  * maximum number of kilobytes of RAM to use before spilling data to disk.
     629             :  * (The normal value of this parameter is work_mem, but some callers use
     630             :  * other values.)  Each variant also has a sortopt which is a bitmask of
     631             :  * sort options.  See TUPLESORT_* definitions in tuplesort.h
     632             :  */
     633             : 
     634             : Tuplesortstate *
     635      278896 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
     636             : {
     637             :     Tuplesortstate *state;
     638             :     MemoryContext maincontext;
     639             :     MemoryContext sortcontext;
     640             :     MemoryContext oldcontext;
     641             : 
     642             :     /* See leader_takeover_tapes() remarks on random access support */
     643      278896 :     if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
     644           0 :         elog(ERROR, "random access disallowed under parallel sort");
     645             : 
     646             :     /*
     647             :      * Memory context surviving tuplesort_reset.  This memory context holds
     648             :      * data which is useful to keep while sorting multiple similar batches.
     649             :      */
     650      278896 :     maincontext = AllocSetContextCreate(CurrentMemoryContext,
     651             :                                         "TupleSort main",
     652             :                                         ALLOCSET_DEFAULT_SIZES);
     653             : 
     654             :     /*
     655             :      * Create a working memory context for one sort operation.  The content of
     656             :      * this context is deleted by tuplesort_reset.
     657             :      */
     658      278896 :     sortcontext = AllocSetContextCreate(maincontext,
     659             :                                         "TupleSort sort",
     660             :                                         ALLOCSET_DEFAULT_SIZES);
     661             : 
     662             :     /*
     663             :      * Additionally a working memory context for tuples is setup in
     664             :      * tuplesort_begin_batch.
     665             :      */
     666             : 
     667             :     /*
     668             :      * Make the Tuplesortstate within the per-sortstate context.  This way, we
     669             :      * don't need a separate pfree() operation for it at shutdown.
     670             :      */
     671      278896 :     oldcontext = MemoryContextSwitchTo(maincontext);
     672             : 
     673      278896 :     state = palloc0_object(Tuplesortstate);
     674             : 
     675      278896 :     if (trace_sort)
     676           0 :         pg_rusage_init(&state->ru_start);
     677             : 
     678      278896 :     state->base.sortopt = sortopt;
     679      278896 :     state->base.tuples = true;
     680      278896 :     state->abbrevNext = 10;
     681             : 
     682             :     /*
     683             :      * workMem is forced to be at least 64KB, the current minimum valid value
     684             :      * for the work_mem GUC.  This is a defense against parallel sort callers
     685             :      * that divide out memory among many workers in a way that leaves each
     686             :      * with very little memory.
     687             :      */
     688      278896 :     state->allowedMem = Max(workMem, 64) * (int64) 1024;
     689      278896 :     state->base.sortcontext = sortcontext;
     690      278896 :     state->base.maincontext = maincontext;
     691             : 
     692      278896 :     state->memtupsize = INITIAL_MEMTUPSIZE;
     693      278896 :     state->memtuples = NULL;
     694             : 
     695             :     /*
     696             :      * After all of the other non-parallel-related state, we setup all of the
     697             :      * state needed for each batch.
     698             :      */
     699      278896 :     tuplesort_begin_batch(state);
     700             : 
     701             :     /*
     702             :      * Initialize parallel-related state based on coordination information
     703             :      * from caller
     704             :      */
     705      278896 :     if (!coordinate)
     706             :     {
     707             :         /* Serial sort */
     708      278068 :         state->shared = NULL;
     709      278068 :         state->worker = -1;
     710      278068 :         state->nParticipants = -1;
     711             :     }
     712         828 :     else if (coordinate->isWorker)
     713             :     {
     714             :         /* Parallel worker produces exactly one final run from all input */
     715         564 :         state->shared = coordinate->sharedsort;
     716         564 :         state->worker = worker_get_identifier(state);
     717         564 :         state->nParticipants = -1;
     718             :     }
     719             :     else
     720             :     {
     721             :         /* Parallel leader state only used for final merge */
     722         264 :         state->shared = coordinate->sharedsort;
     723         264 :         state->worker = -1;
     724         264 :         state->nParticipants = coordinate->nParticipants;
     725             :         Assert(state->nParticipants >= 1);
     726             :     }
     727             : 
     728      278896 :     MemoryContextSwitchTo(oldcontext);
     729             : 
     730      278896 :     return state;
     731             : }
     732             : 
     733             : /*
     734             :  *      tuplesort_begin_batch
     735             :  *
     736             :  * Setup, or reset, all state need for processing a new set of tuples with this
     737             :  * sort state. Called both from tuplesort_begin_common (the first time sorting
     738             :  * with this sort state) and tuplesort_reset (for subsequent usages).
     739             :  */
     740             : static void
     741      282232 : tuplesort_begin_batch(Tuplesortstate *state)
     742             : {
     743             :     MemoryContext oldcontext;
     744             : 
     745      282232 :     oldcontext = MemoryContextSwitchTo(state->base.maincontext);
     746             : 
     747             :     /*
     748             :      * Caller tuple (e.g. IndexTuple) memory context.
     749             :      *
     750             :      * A dedicated child context used exclusively for caller passed tuples
     751             :      * eases memory management.  Resetting at key points reduces
     752             :      * fragmentation. Note that the memtuples array of SortTuples is allocated
     753             :      * in the parent context, not this context, because there is no need to
     754             :      * free memtuples early.  For bounded sorts, tuples may be pfreed in any
     755             :      * order, so we use a regular aset.c context so that it can make use of
     756             :      * free'd memory.  When the sort is not bounded, we make use of a bump.c
     757             :      * context as this keeps allocations more compact with less wastage.
     758             :      * Allocations are also slightly more CPU efficient.
     759             :      */
     760      282232 :     if (TupleSortUseBumpTupleCxt(state->base.sortopt))
     761      280866 :         state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
     762             :                                                      "Caller tuples",
     763             :                                                      ALLOCSET_DEFAULT_SIZES);
     764             :     else
     765        1366 :         state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
     766             :                                                          "Caller tuples",
     767             :                                                          ALLOCSET_DEFAULT_SIZES);
     768             : 
     769             : 
     770      282232 :     state->status = TSS_INITIAL;
     771      282232 :     state->bounded = false;
     772      282232 :     state->boundUsed = false;
     773             : 
     774      282232 :     state->availMem = state->allowedMem;
     775             : 
     776      282232 :     state->tapeset = NULL;
     777             : 
     778      282232 :     state->memtupcount = 0;
     779             : 
     780      282232 :     state->growmemtuples = true;
     781      282232 :     state->slabAllocatorUsed = false;
     782      282232 :     if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
     783             :     {
     784          72 :         pfree(state->memtuples);
     785          72 :         state->memtuples = NULL;
     786          72 :         state->memtupsize = INITIAL_MEMTUPSIZE;
     787             :     }
     788      282232 :     if (state->memtuples == NULL)
     789             :     {
     790      278968 :         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
     791      278968 :         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     792             :     }
     793             : 
     794             :     /* workMem must be large enough for the minimal memtuples array */
     795      282232 :     if (LACKMEM(state))
     796           0 :         elog(ERROR, "insufficient memory allowed for sort");
     797             : 
     798      282232 :     state->currentRun = 0;
     799             : 
     800             :     /*
     801             :      * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
     802             :      * inittapes(), if needed.
     803             :      */
     804             : 
     805      282232 :     state->result_tape = NULL;   /* flag that result tape has not been formed */
     806             : 
     807      282232 :     MemoryContextSwitchTo(oldcontext);
     808      282232 : }
     809             : 
     810             : /*
     811             :  * tuplesort_set_bound
     812             :  *
     813             :  *  Advise tuplesort that at most the first N result tuples are required.
     814             :  *
     815             :  * Must be called before inserting any tuples.  (Actually, we could allow it
     816             :  * as long as the sort hasn't spilled to disk, but there seems no need for
     817             :  * delayed calls at the moment.)
     818             :  *
     819             :  * This is a hint only. The tuplesort may still return more tuples than
     820             :  * requested.  Parallel leader tuplesorts will always ignore the hint.
     821             :  */
     822             : void
     823        1232 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
     824             : {
     825             :     /* Assert we're called before loading any tuples */
     826             :     Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
     827             :     /* Assert we allow bounded sorts */
     828             :     Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
     829             :     /* Can't set the bound twice, either */
     830             :     Assert(!state->bounded);
     831             :     /* Also, this shouldn't be called in a parallel worker */
     832             :     Assert(!WORKER(state));
     833             : 
     834             :     /* Parallel leader allows but ignores hint */
     835        1232 :     if (LEADER(state))
     836           0 :         return;
     837             : 
     838             : #ifdef DEBUG_BOUNDED_SORT
     839             :     /* Honor GUC setting that disables the feature (for easy testing) */
     840             :     if (!optimize_bounded_sort)
     841             :         return;
     842             : #endif
     843             : 
     844             :     /* We want to be able to compute bound * 2, so limit the setting */
     845        1232 :     if (bound > (int64) (INT_MAX / 2))
     846           0 :         return;
     847             : 
     848        1232 :     state->bounded = true;
     849        1232 :     state->bound = (int) bound;
     850             : 
     851             :     /*
     852             :      * Bounded sorts are not an effective target for abbreviated key
     853             :      * optimization.  Disable by setting state to be consistent with no
     854             :      * abbreviation support.
     855             :      */
     856        1232 :     state->base.sortKeys->abbrev_converter = NULL;
     857        1232 :     if (state->base.sortKeys->abbrev_full_comparator)
     858          16 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
     859             : 
     860             :     /* Not strictly necessary, but be tidy */
     861        1232 :     state->base.sortKeys->abbrev_abort = NULL;
     862        1232 :     state->base.sortKeys->abbrev_full_comparator = NULL;
     863             : }
     864             : 
     865             : /*
     866             :  * tuplesort_used_bound
     867             :  *
     868             :  * Allow callers to find out if the sort state was able to use a bound.
     869             :  */
     870             : bool
     871         380 : tuplesort_used_bound(Tuplesortstate *state)
     872             : {
     873         380 :     return state->boundUsed;
     874             : }
     875             : 
     876             : /*
     877             :  * tuplesort_free
     878             :  *
     879             :  *  Internal routine for freeing resources of tuplesort.
     880             :  */
     881             : static void
     882      281970 : tuplesort_free(Tuplesortstate *state)
     883             : {
     884             :     /* context swap probably not needed, but let's be safe */
     885      281970 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
     886             :     int64       spaceUsed;
     887             : 
     888      281970 :     if (state->tapeset)
     889         898 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset);
     890             :     else
     891      281072 :         spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
     892             : 
     893             :     /*
     894             :      * Delete temporary "tape" files, if any.
     895             :      *
     896             :      * We don't bother to destroy the individual tapes here. They will go away
     897             :      * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
     898             :      * finished tapes already.)
     899             :      */
     900      281970 :     if (state->tapeset)
     901         898 :         LogicalTapeSetClose(state->tapeset);
     902             : 
     903      281970 :     if (trace_sort)
     904             :     {
     905           0 :         if (state->tapeset)
     906           0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
     907             :                  SERIAL(state) ? "external sort" : "parallel external sort",
     908             :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     909             :         else
     910           0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
     911             :                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
     912             :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     913             :     }
     914             : 
     915             :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
     916             : 
     917      281970 :     FREESTATE(state);
     918      281970 :     MemoryContextSwitchTo(oldcontext);
     919             : 
     920             :     /*
     921             :      * Free the per-sort memory context, thereby releasing all working memory.
     922             :      */
     923      281970 :     MemoryContextReset(state->base.sortcontext);
     924      281970 : }
     925             : 
     926             : /*
     927             :  * tuplesort_end
     928             :  *
     929             :  *  Release resources and clean up.
     930             :  *
     931             :  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
     932             :  * pointing to garbage.  Be careful not to attempt to use or free such
     933             :  * pointers afterwards!
     934             :  */
     935             : void
     936      278634 : tuplesort_end(Tuplesortstate *state)
     937             : {
     938      278634 :     tuplesort_free(state);
     939             : 
     940             :     /*
     941             :      * Free the main memory context, including the Tuplesortstate struct
     942             :      * itself.
     943             :      */
     944      278634 :     MemoryContextDelete(state->base.maincontext);
     945      278634 : }
     946             : 
     947             : /*
     948             :  * tuplesort_updatemax
     949             :  *
     950             :  *  Update maximum resource usage statistics.
     951             :  */
     952             : static void
     953        3732 : tuplesort_updatemax(Tuplesortstate *state)
     954             : {
     955             :     int64       spaceUsed;
     956             :     bool        isSpaceDisk;
     957             : 
     958             :     /*
     959             :      * Note: it might seem we should provide both memory and disk usage for a
     960             :      * disk-based sort.  However, the current code doesn't track memory space
     961             :      * accurately once we have begun to return tuples to the caller (since we
     962             :      * don't account for pfree's the caller is expected to do), so we cannot
     963             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
     964             :      * to fix.  Is it worth creating an API for the memory context code to
     965             :      * tell us how much is actually used in sortcontext?
     966             :      */
     967        3732 :     if (state->tapeset)
     968             :     {
     969           6 :         isSpaceDisk = true;
     970           6 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
     971             :     }
     972             :     else
     973             :     {
     974        3726 :         isSpaceDisk = false;
     975        3726 :         spaceUsed = state->allowedMem - state->availMem;
     976             :     }
     977             : 
     978             :     /*
     979             :      * Sort evicts data to the disk when it wasn't able to fit that data into
     980             :      * main memory.  This is why we assume space used on the disk to be more
     981             :      * important for tracking resource usage than space used in memory. Note
     982             :      * that the amount of space occupied by some tupleset on the disk might be
     983             :      * less than amount of space occupied by the same tupleset in memory due
     984             :      * to more compact representation.
     985             :      */
     986        3732 :     if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
     987        3726 :         (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
     988             :     {
     989         506 :         state->maxSpace = spaceUsed;
     990         506 :         state->isMaxSpaceDisk = isSpaceDisk;
     991         506 :         state->maxSpaceStatus = state->status;
     992             :     }
     993        3732 : }
     994             : 
     995             : /*
     996             :  * tuplesort_reset
     997             :  *
     998             :  *  Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
     999             :  *  meta-information in.  After tuplesort_reset, tuplesort is ready to start
    1000             :  *  a new sort.  This allows avoiding recreation of tuple sort states (and
    1001             :  *  save resources) when sorting multiple small batches.
    1002             :  */
    1003             : void
    1004        3336 : tuplesort_reset(Tuplesortstate *state)
    1005             : {
    1006        3336 :     tuplesort_updatemax(state);
    1007        3336 :     tuplesort_free(state);
    1008             : 
    1009             :     /*
    1010             :      * After we've freed up per-batch memory, re-setup all of the state common
    1011             :      * to both the first batch and any subsequent batch.
    1012             :      */
    1013        3336 :     tuplesort_begin_batch(state);
    1014             : 
    1015        3336 :     state->lastReturnedTuple = NULL;
    1016        3336 :     state->slabMemoryBegin = NULL;
    1017        3336 :     state->slabMemoryEnd = NULL;
    1018        3336 :     state->slabFreeHead = NULL;
    1019        3336 : }
    1020             : 
    1021             : /*
    1022             :  * Grow the memtuples[] array, if possible within our memory constraint.  We
    1023             :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
    1024             :  * limit.  Return true if we were able to enlarge the array, false if not.
    1025             :  *
    1026             :  * Normally, at each increment we double the size of the array.  When doing
    1027             :  * that would exceed a limit, we attempt one last, smaller increase (and then
    1028             :  * clear the growmemtuples flag so we don't try any more).  That allows us to
    1029             :  * use memory as fully as permitted; sticking to the pure doubling rule could
    1030             :  * result in almost half going unused.  Because availMem moves around with
    1031             :  * tuple addition/removal, we need some rule to prevent making repeated small
    1032             :  * increases in memtupsize, which would just be useless thrashing.  The
    1033             :  * growmemtuples flag accomplishes that and also prevents useless
    1034             :  * recalculations in this function.
    1035             :  */
    1036             : static bool
    1037        8504 : grow_memtuples(Tuplesortstate *state)
    1038             : {
    1039             :     int         newmemtupsize;
    1040        8504 :     int         memtupsize = state->memtupsize;
    1041        8504 :     int64       memNowUsed = state->allowedMem - state->availMem;
    1042             : 
    1043             :     /* Forget it if we've already maxed out memtuples, per comment above */
    1044        8504 :     if (!state->growmemtuples)
    1045         140 :         return false;
    1046             : 
    1047             :     /* Select new value of memtupsize */
    1048        8364 :     if (memNowUsed <= state->availMem)
    1049             :     {
    1050             :         /*
    1051             :          * We've used no more than half of allowedMem; double our usage,
    1052             :          * clamping at INT_MAX tuples.
    1053             :          */
    1054        8222 :         if (memtupsize < INT_MAX / 2)
    1055        8222 :             newmemtupsize = memtupsize * 2;
    1056             :         else
    1057             :         {
    1058           0 :             newmemtupsize = INT_MAX;
    1059           0 :             state->growmemtuples = false;
    1060             :         }
    1061             :     }
    1062             :     else
    1063             :     {
    1064             :         /*
    1065             :          * This will be the last increment of memtupsize.  Abandon doubling
    1066             :          * strategy and instead increase as much as we safely can.
    1067             :          *
    1068             :          * To stay within allowedMem, we can't increase memtupsize by more
    1069             :          * than availMem / sizeof(SortTuple) elements.  In practice, we want
    1070             :          * to increase it by considerably less, because we need to leave some
    1071             :          * space for the tuples to which the new array slots will refer.  We
    1072             :          * assume the new tuples will be about the same size as the tuples
    1073             :          * we've already seen, and thus we can extrapolate from the space
    1074             :          * consumption so far to estimate an appropriate new size for the
    1075             :          * memtuples array.  The optimal value might be higher or lower than
    1076             :          * this estimate, but it's hard to know that in advance.  We again
    1077             :          * clamp at INT_MAX tuples.
    1078             :          *
    1079             :          * This calculation is safe against enlarging the array so much that
    1080             :          * LACKMEM becomes true, because the memory currently used includes
    1081             :          * the present array; thus, there would be enough allowedMem for the
    1082             :          * new array elements even if no other memory were currently used.
    1083             :          *
    1084             :          * We do the arithmetic in float8, because otherwise the product of
    1085             :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
    1086             :          * result should be insignificant; but even if we computed a
    1087             :          * completely insane result, the checks below will prevent anything
    1088             :          * really bad from happening.
    1089             :          */
    1090             :         double      grow_ratio;
    1091             : 
    1092         142 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
    1093         142 :         if (memtupsize * grow_ratio < INT_MAX)
    1094         142 :             newmemtupsize = (int) (memtupsize * grow_ratio);
    1095             :         else
    1096           0 :             newmemtupsize = INT_MAX;
    1097             : 
    1098             :         /* We won't make any further enlargement attempts */
    1099         142 :         state->growmemtuples = false;
    1100             :     }
    1101             : 
    1102             :     /* Must enlarge array by at least one element, else report failure */
    1103        8364 :     if (newmemtupsize <= memtupsize)
    1104           0 :         goto noalloc;
    1105             : 
    1106             :     /*
    1107             :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
    1108             :      * to ensure our request won't be rejected.  Note that we can easily
    1109             :      * exhaust address space before facing this outcome.  (This is presently
    1110             :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
    1111             :      * don't rely on that at this distance.)
    1112             :      */
    1113        8364 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    1114             :     {
    1115           0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
    1116           0 :         state->growmemtuples = false;    /* can't grow any more */
    1117             :     }
    1118             : 
    1119             :     /*
    1120             :      * We need to be sure that we do not cause LACKMEM to become true, else
    1121             :      * the space management algorithm will go nuts.  The code above should
    1122             :      * never generate a dangerous request, but to be safe, check explicitly
    1123             :      * that the array growth fits within availMem.  (We could still cause
    1124             :      * LACKMEM if the memory chunk overhead associated with the memtuples
    1125             :      * array were to increase.  That shouldn't happen because we chose the
    1126             :      * initial array size large enough to ensure that palloc will be treating
    1127             :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
    1128             :      * explicitly below just in case.)
    1129             :      */
    1130        8364 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
    1131           0 :         goto noalloc;
    1132             : 
    1133             :     /* OK, do it */
    1134        8364 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1135        8364 :     state->memtupsize = newmemtupsize;
    1136        8364 :     state->memtuples = (SortTuple *)
    1137        8364 :         repalloc_huge(state->memtuples,
    1138        8364 :                       state->memtupsize * sizeof(SortTuple));
    1139        8364 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1140        8364 :     if (LACKMEM(state))
    1141           0 :         elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    1142        8364 :     return true;
    1143             : 
    1144           0 : noalloc:
    1145             :     /* If for any reason we didn't realloc, shut off future attempts */
    1146           0 :     state->growmemtuples = false;
    1147           0 :     return false;
    1148             : }
    1149             : 
    1150             : /*
    1151             :  * Shared code for tuple and datum cases.
    1152             :  */
    1153             : void
    1154    31245408 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
    1155             :                           bool useAbbrev, Size tuplen)
    1156             : {
    1157    31245408 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1158             : 
    1159             :     Assert(!LEADER(state));
    1160             : 
    1161             :     /* account for the memory used for this tuple */
    1162    31245408 :     USEMEM(state, tuplen);
    1163    31245408 :     state->tupleMem += tuplen;
    1164             : 
    1165    31245408 :     if (!useAbbrev)
    1166             :     {
    1167             :         /*
    1168             :          * Leave ordinary Datum representation, or NULL value.  If there is a
    1169             :          * converter it won't expect NULL values, and cost model is not
    1170             :          * required to account for NULL, so in that case we avoid calling
    1171             :          * converter and just set datum1 to zeroed representation (to be
    1172             :          * consistent, and to support cheap inequality tests for NULL
    1173             :          * abbreviated keys).
    1174             :          */
    1175             :     }
    1176     4445810 :     else if (!consider_abort_common(state))
    1177             :     {
    1178             :         /* Store abbreviated key representation */
    1179     4445714 :         tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
    1180             :                                                                state->base.sortKeys);
    1181             :     }
    1182             :     else
    1183             :     {
    1184             :         /*
    1185             :          * Set state to be consistent with never trying abbreviation.
    1186             :          *
    1187             :          * Alter datum1 representation in already-copied tuples, so as to
    1188             :          * ensure a consistent representation (current tuple was just
    1189             :          * handled).  It does not matter if some dumped tuples are already
    1190             :          * sorted on tape, since serialized tuples lack abbreviated keys
    1191             :          * (TSS_BUILDRUNS state prevents control reaching here in any case).
    1192             :          */
    1193          96 :         REMOVEABBREV(state, state->memtuples, state->memtupcount);
    1194             :     }
    1195             : 
    1196    31245408 :     switch (state->status)
    1197             :     {
    1198    26382942 :         case TSS_INITIAL:
    1199             : 
    1200             :             /*
    1201             :              * Save the tuple into the unsorted array.  First, grow the array
    1202             :              * as needed.  Note that we try to grow the array when there is
    1203             :              * still one free slot remaining --- if we fail, there'll still be
    1204             :              * room to store the incoming tuple, and then we'll switch to
    1205             :              * tape-based operation.
    1206             :              */
    1207    26382942 :             if (state->memtupcount >= state->memtupsize - 1)
    1208             :             {
    1209        8504 :                 (void) grow_memtuples(state);
    1210             :                 Assert(state->memtupcount < state->memtupsize);
    1211             :             }
    1212    26382942 :             state->memtuples[state->memtupcount++] = *tuple;
    1213             : 
    1214             :             /*
    1215             :              * Check if it's time to switch over to a bounded heapsort. We do
    1216             :              * so if the input tuple count exceeds twice the desired tuple
    1217             :              * count (this is a heuristic for where heapsort becomes cheaper
    1218             :              * than a quicksort), or if we've just filled workMem and have
    1219             :              * enough tuples to meet the bound.
    1220             :              *
    1221             :              * Note that once we enter TSS_BOUNDED state we will always try to
    1222             :              * complete the sort that way.  In the worst case, if later input
    1223             :              * tuples are larger than earlier ones, this might cause us to
    1224             :              * exceed workMem significantly.
    1225             :              */
    1226    26382942 :             if (state->bounded &&
    1227       47316 :                 (state->memtupcount > state->bound * 2 ||
    1228       46908 :                  (state->memtupcount > state->bound && LACKMEM(state))))
    1229             :             {
    1230         408 :                 if (trace_sort)
    1231           0 :                     elog(LOG, "switching to bounded heapsort at %d tuples: %s",
    1232             :                          state->memtupcount,
    1233             :                          pg_rusage_show(&state->ru_start));
    1234         408 :                 make_bounded_heap(state);
    1235         408 :                 MemoryContextSwitchTo(oldcontext);
    1236         408 :                 return;
    1237             :             }
    1238             : 
    1239             :             /*
    1240             :              * Done if we still fit in available memory and have array slots.
    1241             :              */
    1242    26382534 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
    1243             :             {
    1244    26382394 :                 MemoryContextSwitchTo(oldcontext);
    1245    26382394 :                 return;
    1246             :             }
    1247             : 
    1248             :             /*
    1249             :              * Nope; time to switch to tape-based operation.
    1250             :              */
    1251         140 :             inittapes(state, true);
    1252             : 
    1253             :             /*
    1254             :              * Dump all tuples.
    1255             :              */
    1256         140 :             dumptuples(state, false);
    1257         140 :             break;
    1258             : 
    1259     3750180 :         case TSS_BOUNDED:
    1260             : 
    1261             :             /*
    1262             :              * We don't want to grow the array here, so check whether the new
    1263             :              * tuple can be discarded before putting it in.  This should be a
    1264             :              * good speed optimization, too, since when there are many more
    1265             :              * input tuples than the bound, most input tuples can be discarded
    1266             :              * with just this one comparison.  Note that because we currently
    1267             :              * have the sort direction reversed, we must check for <= not >=.
    1268             :              */
    1269     3750180 :             if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
    1270             :             {
    1271             :                 /* new tuple <= top of the heap, so we can discard it */
    1272     3247194 :                 free_sort_tuple(state, tuple);
    1273     3247194 :                 CHECK_FOR_INTERRUPTS();
    1274             :             }
    1275             :             else
    1276             :             {
    1277             :                 /* discard top of heap, replacing it with the new tuple */
    1278      502986 :                 free_sort_tuple(state, &state->memtuples[0]);
    1279      502986 :                 tuplesort_heap_replace_top(state, tuple);
    1280             :             }
    1281     3750180 :             break;
    1282             : 
    1283     1112286 :         case TSS_BUILDRUNS:
    1284             : 
    1285             :             /*
    1286             :              * Save the tuple into the unsorted array (there must be space)
    1287             :              */
    1288     1112286 :             state->memtuples[state->memtupcount++] = *tuple;
    1289             : 
    1290             :             /*
    1291             :              * If we are over the memory limit, dump all tuples.
    1292             :              */
    1293     1112286 :             dumptuples(state, false);
    1294     1112286 :             break;
    1295             : 
    1296           0 :         default:
    1297           0 :             elog(ERROR, "invalid tuplesort state");
    1298             :             break;
    1299             :     }
    1300     4862606 :     MemoryContextSwitchTo(oldcontext);
    1301             : }
    1302             : 
    1303             : static bool
    1304     4445810 : consider_abort_common(Tuplesortstate *state)
    1305             : {
    1306             :     Assert(state->base.sortKeys[0].abbrev_converter != NULL);
    1307             :     Assert(state->base.sortKeys[0].abbrev_abort != NULL);
    1308             :     Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
    1309             : 
    1310             :     /*
    1311             :      * Check effectiveness of abbreviation optimization.  Consider aborting
    1312             :      * when still within memory limit.
    1313             :      */
    1314     4445810 :     if (state->status == TSS_INITIAL &&
    1315     3973380 :         state->memtupcount >= state->abbrevNext)
    1316             :     {
    1317        5144 :         state->abbrevNext *= 2;
    1318             : 
    1319             :         /*
    1320             :          * Check opclass-supplied abbreviation abort routine.  It may indicate
    1321             :          * that abbreviation should not proceed.
    1322             :          */
    1323        5144 :         if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
    1324             :                                                 state->base.sortKeys))
    1325        5048 :             return false;
    1326             : 
    1327             :         /*
    1328             :          * Finally, restore authoritative comparator, and indicate that
    1329             :          * abbreviation is not in play by setting abbrev_converter to NULL
    1330             :          */
    1331          96 :         state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
    1332          96 :         state->base.sortKeys[0].abbrev_converter = NULL;
    1333             :         /* Not strictly necessary, but be tidy */
    1334          96 :         state->base.sortKeys[0].abbrev_abort = NULL;
    1335          96 :         state->base.sortKeys[0].abbrev_full_comparator = NULL;
    1336             : 
    1337             :         /* Give up - expect original pass-by-value representation */
    1338          96 :         return true;
    1339             :     }
    1340             : 
    1341     4440666 :     return false;
    1342             : }
    1343             : 
    1344             : /*
    1345             :  * All tuples have been provided; finish the sort.
    1346             :  */
    1347             : void
    1348      239304 : tuplesort_performsort(Tuplesortstate *state)
    1349             : {
    1350      239304 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1351             : 
    1352      239304 :     if (trace_sort)
    1353           0 :         elog(LOG, "performsort of worker %d starting: %s",
    1354             :              state->worker, pg_rusage_show(&state->ru_start));
    1355             : 
    1356      239304 :     switch (state->status)
    1357             :     {
    1358      238756 :         case TSS_INITIAL:
    1359             : 
    1360             :             /*
    1361             :              * We were able to accumulate all the tuples within the allowed
    1362             :              * amount of memory, or leader to take over worker tapes
    1363             :              */
    1364      238756 :             if (SERIAL(state))
    1365             :             {
    1366             :                 /* Just qsort 'em and we're done */
    1367      237998 :                 tuplesort_sort_memtuples(state);
    1368      237908 :                 state->status = TSS_SORTEDINMEM;
    1369             :             }
    1370         758 :             else if (WORKER(state))
    1371             :             {
    1372             :                 /*
    1373             :                  * Parallel workers must still dump out tuples to tape.  No
    1374             :                  * merge is required to produce single output run, though.
    1375             :                  */
    1376         564 :                 inittapes(state, false);
    1377         564 :                 dumptuples(state, true);
    1378         564 :                 worker_nomergeruns(state);
    1379         564 :                 state->status = TSS_SORTEDONTAPE;
    1380             :             }
    1381             :             else
    1382             :             {
    1383             :                 /*
    1384             :                  * Leader will take over worker tapes and merge worker runs.
    1385             :                  * Note that mergeruns sets the correct state->status.
    1386             :                  */
    1387         194 :                 leader_takeover_tapes(state);
    1388         194 :                 mergeruns(state);
    1389             :             }
    1390      238666 :             state->current = 0;
    1391      238666 :             state->eof_reached = false;
    1392      238666 :             state->markpos_block = 0L;
    1393      238666 :             state->markpos_offset = 0;
    1394      238666 :             state->markpos_eof = false;
    1395      238666 :             break;
    1396             : 
    1397         408 :         case TSS_BOUNDED:
    1398             : 
    1399             :             /*
    1400             :              * We were able to accumulate all the tuples required for output
    1401             :              * in memory, using a heap to eliminate excess tuples.  Now we
    1402             :              * have to transform the heap to a properly-sorted array. Note
    1403             :              * that sort_bounded_heap sets the correct state->status.
    1404             :              */
    1405         408 :             sort_bounded_heap(state);
    1406         408 :             state->current = 0;
    1407         408 :             state->eof_reached = false;
    1408         408 :             state->markpos_offset = 0;
    1409         408 :             state->markpos_eof = false;
    1410         408 :             break;
    1411             : 
    1412         140 :         case TSS_BUILDRUNS:
    1413             : 
    1414             :             /*
    1415             :              * Finish tape-based sort.  First, flush all tuples remaining in
    1416             :              * memory out to tape; then merge until we have a single remaining
    1417             :              * run (or, if !randomAccess and !WORKER(), one run per tape).
    1418             :              * Note that mergeruns sets the correct state->status.
    1419             :              */
    1420         140 :             dumptuples(state, true);
    1421         140 :             mergeruns(state);
    1422         140 :             state->eof_reached = false;
    1423         140 :             state->markpos_block = 0L;
    1424         140 :             state->markpos_offset = 0;
    1425         140 :             state->markpos_eof = false;
    1426         140 :             break;
    1427             : 
    1428           0 :         default:
    1429           0 :             elog(ERROR, "invalid tuplesort state");
    1430             :             break;
    1431             :     }
    1432             : 
    1433      239214 :     if (trace_sort)
    1434             :     {
    1435           0 :         if (state->status == TSS_FINALMERGE)
    1436           0 :             elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
    1437             :                  state->worker, state->nInputTapes,
    1438             :                  pg_rusage_show(&state->ru_start));
    1439             :         else
    1440           0 :             elog(LOG, "performsort of worker %d done: %s",
    1441             :                  state->worker, pg_rusage_show(&state->ru_start));
    1442             :     }
    1443             : 
    1444      239214 :     MemoryContextSwitchTo(oldcontext);
    1445      239214 : }
    1446             : 
    1447             : /*
    1448             :  * Internal routine to fetch the next tuple in either forward or back
    1449             :  * direction into *stup.  Returns false if no more tuples.
    1450             :  * Returned tuple belongs to tuplesort memory context, and must not be freed
    1451             :  * by caller.  Note that fetched tuple is stored in memory that may be
    1452             :  * recycled by any future fetch.
    1453             :  */
    1454             : bool
    1455    28869564 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    1456             :                           SortTuple *stup)
    1457             : {
    1458             :     unsigned int tuplen;
    1459             :     size_t      nmoved;
    1460             : 
    1461             :     Assert(!WORKER(state));
    1462             : 
    1463    28869564 :     switch (state->status)
    1464             :     {
    1465    23884570 :         case TSS_SORTEDINMEM:
    1466             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1467             :             Assert(!state->slabAllocatorUsed);
    1468    23884570 :             if (forward)
    1469             :             {
    1470    23884504 :                 if (state->current < state->memtupcount)
    1471             :                 {
    1472    23647506 :                     *stup = state->memtuples[state->current++];
    1473    23647506 :                     return true;
    1474             :                 }
    1475      236998 :                 state->eof_reached = true;
    1476             : 
    1477             :                 /*
    1478             :                  * Complain if caller tries to retrieve more tuples than
    1479             :                  * originally asked for in a bounded sort.  This is because
    1480             :                  * returning EOF here might be the wrong thing.
    1481             :                  */
    1482      236998 :                 if (state->bounded && state->current >= state->bound)
    1483           0 :                     elog(ERROR, "retrieved too many tuples in a bounded sort");
    1484             : 
    1485      236998 :                 return false;
    1486             :             }
    1487             :             else
    1488             :             {
    1489          66 :                 if (state->current <= 0)
    1490           0 :                     return false;
    1491             : 
    1492             :                 /*
    1493             :                  * if all tuples are fetched already then we return last
    1494             :                  * tuple, else - tuple before last returned.
    1495             :                  */
    1496          66 :                 if (state->eof_reached)
    1497          12 :                     state->eof_reached = false;
    1498             :                 else
    1499             :                 {
    1500          54 :                     state->current--;    /* last returned tuple */
    1501          54 :                     if (state->current <= 0)
    1502           6 :                         return false;
    1503             :                 }
    1504          60 :                 *stup = state->memtuples[state->current - 1];
    1505          60 :                 return true;
    1506             :             }
    1507             :             break;
    1508             : 
    1509      303000 :         case TSS_SORTEDONTAPE:
    1510             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1511             :             Assert(state->slabAllocatorUsed);
    1512             : 
    1513             :             /*
    1514             :              * The slot that held the tuple that we returned in previous
    1515             :              * gettuple call can now be reused.
    1516             :              */
    1517      303000 :             if (state->lastReturnedTuple)
    1518             :             {
    1519      152850 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1520      152850 :                 state->lastReturnedTuple = NULL;
    1521             :             }
    1522             : 
    1523      303000 :             if (forward)
    1524             :             {
    1525      302970 :                 if (state->eof_reached)
    1526           0 :                     return false;
    1527             : 
    1528      302970 :                 if ((tuplen = getlen(state->result_tape, true)) != 0)
    1529             :                 {
    1530      302940 :                     READTUP(state, stup, state->result_tape, tuplen);
    1531             : 
    1532             :                     /*
    1533             :                      * Remember the tuple we return, so that we can recycle
    1534             :                      * its memory on next call.  (This can be NULL, in the
    1535             :                      * !state->tuples case).
    1536             :                      */
    1537      302940 :                     state->lastReturnedTuple = stup->tuple;
    1538             : 
    1539      302940 :                     return true;
    1540             :                 }
    1541             :                 else
    1542             :                 {
    1543          30 :                     state->eof_reached = true;
    1544          30 :                     return false;
    1545             :                 }
    1546             :             }
    1547             : 
    1548             :             /*
    1549             :              * Backward.
    1550             :              *
    1551             :              * if all tuples are fetched already then we return last tuple,
    1552             :              * else - tuple before last returned.
    1553             :              */
    1554          30 :             if (state->eof_reached)
    1555             :             {
    1556             :                 /*
    1557             :                  * Seek position is pointing just past the zero tuplen at the
    1558             :                  * end of file; back up to fetch last tuple's ending length
    1559             :                  * word.  If seek fails we must have a completely empty file.
    1560             :                  */
    1561          12 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1562             :                                               2 * sizeof(unsigned int));
    1563          12 :                 if (nmoved == 0)
    1564           0 :                     return false;
    1565          12 :                 else if (nmoved != 2 * sizeof(unsigned int))
    1566           0 :                     elog(ERROR, "unexpected tape position");
    1567          12 :                 state->eof_reached = false;
    1568             :             }
    1569             :             else
    1570             :             {
    1571             :                 /*
    1572             :                  * Back up and fetch previously-returned tuple's ending length
    1573             :                  * word.  If seek fails, assume we are at start of file.
    1574             :                  */
    1575          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1576             :                                               sizeof(unsigned int));
    1577          18 :                 if (nmoved == 0)
    1578           0 :                     return false;
    1579          18 :                 else if (nmoved != sizeof(unsigned int))
    1580           0 :                     elog(ERROR, "unexpected tape position");
    1581          18 :                 tuplen = getlen(state->result_tape, false);
    1582             : 
    1583             :                 /*
    1584             :                  * Back up to get ending length word of tuple before it.
    1585             :                  */
    1586          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1587             :                                               tuplen + 2 * sizeof(unsigned int));
    1588          18 :                 if (nmoved == tuplen + sizeof(unsigned int))
    1589             :                 {
    1590             :                     /*
    1591             :                      * We backed up over the previous tuple, but there was no
    1592             :                      * ending length word before it.  That means that the prev
    1593             :                      * tuple is the first tuple in the file.  It is now the
    1594             :                      * next to read in forward direction (not obviously right,
    1595             :                      * but that is what in-memory case does).
    1596             :                      */
    1597           6 :                     return false;
    1598             :                 }
    1599          12 :                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
    1600           0 :                     elog(ERROR, "bogus tuple length in backward scan");
    1601             :             }
    1602             : 
    1603          24 :             tuplen = getlen(state->result_tape, false);
    1604             : 
    1605             :             /*
    1606             :              * Now we have the length of the prior tuple, back up and read it.
    1607             :              * Note: READTUP expects we are positioned after the initial
    1608             :              * length word of the tuple, so back up to that point.
    1609             :              */
    1610          24 :             nmoved = LogicalTapeBackspace(state->result_tape,
    1611             :                                           tuplen);
    1612          24 :             if (nmoved != tuplen)
    1613           0 :                 elog(ERROR, "bogus tuple length in backward scan");
    1614          24 :             READTUP(state, stup, state->result_tape, tuplen);
    1615             : 
    1616             :             /*
    1617             :              * Remember the tuple we return, so that we can recycle its memory
    1618             :              * on next call. (This can be NULL, in the Datum case).
    1619             :              */
    1620          24 :             state->lastReturnedTuple = stup->tuple;
    1621             : 
    1622          24 :             return true;
    1623             : 
    1624     4681994 :         case TSS_FINALMERGE:
    1625             :             Assert(forward);
    1626             :             /* We are managing memory ourselves, with the slab allocator. */
    1627             :             Assert(state->slabAllocatorUsed);
    1628             : 
    1629             :             /*
    1630             :              * The slab slot holding the tuple that we returned in previous
    1631             :              * gettuple call can now be reused.
    1632             :              */
    1633     4681994 :             if (state->lastReturnedTuple)
    1634             :             {
    1635     4561652 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1636     4561652 :                 state->lastReturnedTuple = NULL;
    1637             :             }
    1638             : 
    1639             :             /*
    1640             :              * This code should match the inner loop of mergeonerun().
    1641             :              */
    1642     4681994 :             if (state->memtupcount > 0)
    1643             :             {
    1644     4681700 :                 int         srcTapeIndex = state->memtuples[0].srctape;
    1645     4681700 :                 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
    1646             :                 SortTuple   newtup;
    1647             : 
    1648     4681700 :                 *stup = state->memtuples[0];
    1649             : 
    1650             :                 /*
    1651             :                  * Remember the tuple we return, so that we can recycle its
    1652             :                  * memory on next call. (This can be NULL, in the Datum case).
    1653             :                  */
    1654     4681700 :                 state->lastReturnedTuple = stup->tuple;
    1655             : 
    1656             :                 /*
    1657             :                  * Pull next tuple from tape, and replace the returned tuple
    1658             :                  * at top of the heap with it.
    1659             :                  */
    1660     4681700 :                 if (!mergereadnext(state, srcTape, &newtup))
    1661             :                 {
    1662             :                     /*
    1663             :                      * If no more data, we've reached end of run on this tape.
    1664             :                      * Remove the top node from the heap.
    1665             :                      */
    1666         430 :                     tuplesort_heap_delete_top(state);
    1667         430 :                     state->nInputRuns--;
    1668             : 
    1669             :                     /*
    1670             :                      * Close the tape.  It'd go away at the end of the sort
    1671             :                      * anyway, but better to release the memory early.
    1672             :                      */
    1673         430 :                     LogicalTapeClose(srcTape);
    1674         430 :                     return true;
    1675             :                 }
    1676     4681270 :                 newtup.srctape = srcTapeIndex;
    1677     4681270 :                 tuplesort_heap_replace_top(state, &newtup);
    1678     4681270 :                 return true;
    1679             :             }
    1680         294 :             return false;
    1681             : 
    1682           0 :         default:
    1683           0 :             elog(ERROR, "invalid tuplesort state");
    1684             :             return false;       /* keep compiler quiet */
    1685             :     }
    1686             : }
    1687             : 
    1688             : 
    1689             : /*
    1690             :  * Advance over N tuples in either forward or back direction,
    1691             :  * without returning any data.  N==0 is a no-op.
    1692             :  * Returns true if successful, false if ran out of tuples.
    1693             :  */
    1694             : bool
    1695         392 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
    1696             : {
    1697             :     MemoryContext oldcontext;
    1698             : 
    1699             :     /*
    1700             :      * We don't actually support backwards skip yet, because no callers need
    1701             :      * it.  The API is designed to allow for that later, though.
    1702             :      */
    1703             :     Assert(forward);
    1704             :     Assert(ntuples >= 0);
    1705             :     Assert(!WORKER(state));
    1706             : 
    1707         392 :     switch (state->status)
    1708             :     {
    1709         368 :         case TSS_SORTEDINMEM:
    1710         368 :             if (state->memtupcount - state->current >= ntuples)
    1711             :             {
    1712         368 :                 state->current += ntuples;
    1713         368 :                 return true;
    1714             :             }
    1715           0 :             state->current = state->memtupcount;
    1716           0 :             state->eof_reached = true;
    1717             : 
    1718             :             /*
    1719             :              * Complain if caller tries to retrieve more tuples than
    1720             :              * originally asked for in a bounded sort.  This is because
    1721             :              * returning EOF here might be the wrong thing.
    1722             :              */
    1723           0 :             if (state->bounded && state->current >= state->bound)
    1724           0 :                 elog(ERROR, "retrieved too many tuples in a bounded sort");
    1725             : 
    1726           0 :             return false;
    1727             : 
    1728          24 :         case TSS_SORTEDONTAPE:
    1729             :         case TSS_FINALMERGE:
    1730             : 
    1731             :             /*
    1732             :              * We could probably optimize these cases better, but for now it's
    1733             :              * not worth the trouble.
    1734             :              */
    1735          24 :             oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1736      240132 :             while (ntuples-- > 0)
    1737             :             {
    1738             :                 SortTuple   stup;
    1739             : 
    1740      240108 :                 if (!tuplesort_gettuple_common(state, forward, &stup))
    1741             :                 {
    1742           0 :                     MemoryContextSwitchTo(oldcontext);
    1743           0 :                     return false;
    1744             :                 }
    1745      240108 :                 CHECK_FOR_INTERRUPTS();
    1746             :             }
    1747          24 :             MemoryContextSwitchTo(oldcontext);
    1748          24 :             return true;
    1749             : 
    1750           0 :         default:
    1751           0 :             elog(ERROR, "invalid tuplesort state");
    1752             :             return false;       /* keep compiler quiet */
    1753             :     }
    1754             : }
    1755             : 
    1756             : /*
    1757             :  * tuplesort_merge_order - report merge order we'll use for given memory
    1758             :  * (note: "merge order" just means the number of input tapes in the merge).
    1759             :  *
    1760             :  * This is exported for use by the planner.  allowedMem is in bytes.
    1761             :  */
    1762             : int
    1763       17674 : tuplesort_merge_order(int64 allowedMem)
    1764             : {
    1765             :     int         mOrder;
    1766             : 
    1767             :     /*----------
    1768             :      * In the merge phase, we need buffer space for each input and output tape.
    1769             :      * Each pass in the balanced merge algorithm reads from M input tapes, and
    1770             :      * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
    1771             :      * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
    1772             :      * input tape.
    1773             :      *
    1774             :      * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
    1775             :      *            N * TAPE_BUFFER_OVERHEAD
    1776             :      *
    1777             :      * Except for the last and next-to-last merge passes, where there can be
    1778             :      * fewer tapes left to process, M = N.  We choose M so that we have the
    1779             :      * desired amount of memory available for the input buffers
    1780             :      * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
    1781             :      * available for the tape buffers (allowedMem).
    1782             :      *
    1783             :      * Note: you might be thinking we need to account for the memtuples[]
    1784             :      * array in this calculation, but we effectively treat that as part of the
    1785             :      * MERGE_BUFFER_SIZE workspace.
    1786             :      *----------
    1787             :      */
    1788       17674 :     mOrder = allowedMem /
    1789             :         (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
    1790             : 
    1791             :     /*
    1792             :      * Even in minimum memory, use at least a MINORDER merge.  On the other
    1793             :      * hand, even when we have lots of memory, do not use more than a MAXORDER
    1794             :      * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
    1795             :      * additional tape reduces the amount of memory available to build runs,
    1796             :      * which in turn can cause the same sort to need more runs, which makes
    1797             :      * merging slower even if it can still be done in a single pass.  Also,
    1798             :      * high order merges are quite slow due to CPU cache effects; it can be
    1799             :      * faster to pay the I/O cost of a multi-pass merge than to perform a
    1800             :      * single merge pass across many hundreds of tapes.
    1801             :      */
    1802       17674 :     mOrder = Max(mOrder, MINORDER);
    1803       17674 :     mOrder = Min(mOrder, MAXORDER);
    1804             : 
    1805       17674 :     return mOrder;
    1806             : }
    1807             : 
    1808             : /*
    1809             :  * Helper function to calculate how much memory to allocate for the read buffer
    1810             :  * of each input tape in a merge pass.
    1811             :  *
    1812             :  * 'avail_mem' is the amount of memory available for the buffers of all the
    1813             :  *      tapes, both input and output.
    1814             :  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
    1815             :  * 'maxOutputTapes' is the max. number of output tapes we should produce.
    1816             :  */
    1817             : static int64
    1818         364 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
    1819             :                        int maxOutputTapes)
    1820             : {
    1821             :     int         nOutputRuns;
    1822             :     int         nOutputTapes;
    1823             : 
    1824             :     /*
    1825             :      * How many output tapes will we produce in this pass?
    1826             :      *
    1827             :      * This is nInputRuns / nInputTapes, rounded up.
    1828             :      */
    1829         364 :     nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
    1830             : 
    1831         364 :     nOutputTapes = Min(nOutputRuns, maxOutputTapes);
    1832             : 
    1833             :     /*
    1834             :      * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
    1835             :      * remaining memory is divided evenly between the input tapes.
    1836             :      *
    1837             :      * This also follows from the formula in tuplesort_merge_order, but here
    1838             :      * we derive the input buffer size from the amount of memory available,
    1839             :      * and M and N.
    1840             :      */
    1841         364 :     return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
    1842             : }
    1843             : 
    1844             : /*
    1845             :  * inittapes - initialize for tape sorting.
    1846             :  *
    1847             :  * This is called only if we have found we won't sort in memory.
    1848             :  */
    1849             : static void
    1850         704 : inittapes(Tuplesortstate *state, bool mergeruns)
    1851             : {
    1852             :     Assert(!LEADER(state));
    1853             : 
    1854         704 :     if (mergeruns)
    1855             :     {
    1856             :         /* Compute number of input tapes to use when merging */
    1857         140 :         state->maxTapes = tuplesort_merge_order(state->allowedMem);
    1858             :     }
    1859             :     else
    1860             :     {
    1861             :         /* Workers can sometimes produce single run, output without merge */
    1862             :         Assert(WORKER(state));
    1863         564 :         state->maxTapes = MINORDER;
    1864             :     }
    1865             : 
    1866         704 :     if (trace_sort)
    1867           0 :         elog(LOG, "worker %d switching to external sort with %d tapes: %s",
    1868             :              state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
    1869             : 
    1870             :     /* Create the tape set */
    1871         704 :     inittapestate(state, state->maxTapes);
    1872         704 :     state->tapeset =
    1873         704 :         LogicalTapeSetCreate(false,
    1874         704 :                              state->shared ? &state->shared->fileset : NULL,
    1875             :                              state->worker);
    1876             : 
    1877         704 :     state->currentRun = 0;
    1878             : 
    1879             :     /*
    1880             :      * Initialize logical tape arrays.
    1881             :      */
    1882         704 :     state->inputTapes = NULL;
    1883         704 :     state->nInputTapes = 0;
    1884         704 :     state->nInputRuns = 0;
    1885             : 
    1886         704 :     state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
    1887         704 :     state->nOutputTapes = 0;
    1888         704 :     state->nOutputRuns = 0;
    1889             : 
    1890         704 :     state->status = TSS_BUILDRUNS;
    1891             : 
    1892         704 :     selectnewtape(state);
    1893         704 : }
    1894             : 
    1895             : /*
    1896             :  * inittapestate - initialize generic tape management state
    1897             :  */
    1898             : static void
    1899         898 : inittapestate(Tuplesortstate *state, int maxTapes)
    1900             : {
    1901             :     int64       tapeSpace;
    1902             : 
    1903             :     /*
    1904             :      * Decrease availMem to reflect the space needed for tape buffers; but
    1905             :      * don't decrease it to the point that we have no room for tuples. (That
    1906             :      * case is only likely to occur if sorting pass-by-value Datums; in all
    1907             :      * other scenarios the memtuples[] array is unlikely to occupy more than
    1908             :      * half of allowedMem.  In the pass-by-value case it's not important to
    1909             :      * account for tuple space, so we don't care if LACKMEM becomes
    1910             :      * inaccurate.)
    1911             :      */
    1912         898 :     tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
    1913             : 
    1914         898 :     if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
    1915         776 :         USEMEM(state, tapeSpace);
    1916             : 
    1917             :     /*
    1918             :      * Make sure that the temp file(s) underlying the tape set are created in
    1919             :      * suitable temp tablespaces.  For parallel sorts, this should have been
    1920             :      * called already, but it doesn't matter if it is called a second time.
    1921             :      */
    1922         898 :     PrepareTempTablespaces();
    1923         898 : }
    1924             : 
    1925             : /*
    1926             :  * selectnewtape -- select next tape to output to.
    1927             :  *
    1928             :  * This is called after finishing a run when we know another run
    1929             :  * must be started.  This is used both when building the initial
    1930             :  * runs, and during merge passes.
    1931             :  */
    1932             : static void
    1933        1808 : selectnewtape(Tuplesortstate *state)
    1934             : {
    1935             :     /*
    1936             :      * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
    1937             :      * both zero.  On each call, we create a new output tape to hold the next
    1938             :      * run, until maxTapes is reached.  After that, we assign new runs to the
    1939             :      * existing tapes in a round robin fashion.
    1940             :      */
    1941        1808 :     if (state->nOutputTapes < state->maxTapes)
    1942             :     {
    1943             :         /* Create a new tape to hold the next run */
    1944             :         Assert(state->outputTapes[state->nOutputRuns] == NULL);
    1945             :         Assert(state->nOutputRuns == state->nOutputTapes);
    1946        1226 :         state->destTape = LogicalTapeCreate(state->tapeset);
    1947        1226 :         state->outputTapes[state->nOutputTapes] = state->destTape;
    1948        1226 :         state->nOutputTapes++;
    1949        1226 :         state->nOutputRuns++;
    1950             :     }
    1951             :     else
    1952             :     {
    1953             :         /*
    1954             :          * We have reached the max number of tapes.  Append to an existing
    1955             :          * tape.
    1956             :          */
    1957         582 :         state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
    1958         582 :         state->nOutputRuns++;
    1959             :     }
    1960        1808 : }
    1961             : 
    1962             : /*
    1963             :  * Initialize the slab allocation arena, for the given number of slots.
    1964             :  */
    1965             : static void
    1966         334 : init_slab_allocator(Tuplesortstate *state, int numSlots)
    1967             : {
    1968         334 :     if (numSlots > 0)
    1969             :     {
    1970             :         char       *p;
    1971             :         int         i;
    1972             : 
    1973         306 :         state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
    1974         306 :         state->slabMemoryEnd = state->slabMemoryBegin +
    1975         306 :             numSlots * SLAB_SLOT_SIZE;
    1976         306 :         state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
    1977         306 :         USEMEM(state, numSlots * SLAB_SLOT_SIZE);
    1978             : 
    1979         306 :         p = state->slabMemoryBegin;
    1980        1170 :         for (i = 0; i < numSlots - 1; i++)
    1981             :         {
    1982         864 :             ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
    1983         864 :             p += SLAB_SLOT_SIZE;
    1984             :         }
    1985         306 :         ((SlabSlot *) p)->nextfree = NULL;
    1986             :     }
    1987             :     else
    1988             :     {
    1989          28 :         state->slabMemoryBegin = state->slabMemoryEnd = NULL;
    1990          28 :         state->slabFreeHead = NULL;
    1991             :     }
    1992         334 :     state->slabAllocatorUsed = true;
    1993         334 : }
    1994             : 
    1995             : /*
    1996             :  * mergeruns -- merge all the completed initial runs.
    1997             :  *
    1998             :  * This implements the Balanced k-Way Merge Algorithm.  All input data has
    1999             :  * already been written to initial runs on tape (see dumptuples).
    2000             :  */
    2001             : static void
    2002         334 : mergeruns(Tuplesortstate *state)
    2003             : {
    2004             :     int         tapenum;
    2005             : 
    2006             :     Assert(state->status == TSS_BUILDRUNS);
    2007             :     Assert(state->memtupcount == 0);
    2008             : 
    2009         334 :     if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    2010             :     {
    2011             :         /*
    2012             :          * If there are multiple runs to be merged, when we go to read back
    2013             :          * tuples from disk, abbreviated keys will not have been stored, and
    2014             :          * we don't care to regenerate them.  Disable abbreviation from this
    2015             :          * point on.
    2016             :          */
    2017          30 :         state->base.sortKeys->abbrev_converter = NULL;
    2018          30 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
    2019             : 
    2020             :         /* Not strictly necessary, but be tidy */
    2021          30 :         state->base.sortKeys->abbrev_abort = NULL;
    2022          30 :         state->base.sortKeys->abbrev_full_comparator = NULL;
    2023             :     }
    2024             : 
    2025             :     /*
    2026             :      * Reset tuple memory.  We've freed all the tuples that we previously
    2027             :      * allocated.  We will use the slab allocator from now on.
    2028             :      */
    2029         334 :     MemoryContextResetOnly(state->base.tuplecontext);
    2030             : 
    2031             :     /*
    2032             :      * We no longer need a large memtuples array.  (We will allocate a smaller
    2033             :      * one for the heap later.)
    2034             :      */
    2035         334 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2036         334 :     pfree(state->memtuples);
    2037         334 :     state->memtuples = NULL;
    2038             : 
    2039             :     /*
    2040             :      * Initialize the slab allocator.  We need one slab slot per input tape,
    2041             :      * for the tuples in the heap, plus one to hold the tuple last returned
    2042             :      * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
    2043             :      * however, we don't need to do allocate anything.)
    2044             :      *
    2045             :      * In a multi-pass merge, we could shrink this allocation for the last
    2046             :      * merge pass, if it has fewer tapes than previous passes, but we don't
    2047             :      * bother.
    2048             :      *
    2049             :      * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
    2050             :      * to track memory usage of individual tuples.
    2051             :      */
    2052         334 :     if (state->base.tuples)
    2053         306 :         init_slab_allocator(state, state->nOutputTapes + 1);
    2054             :     else
    2055          28 :         init_slab_allocator(state, 0);
    2056             : 
    2057             :     /*
    2058             :      * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
    2059             :      * from each input tape.
    2060             :      *
    2061             :      * We could shrink this, too, between passes in a multi-pass merge, but we
    2062             :      * don't bother.  (The initial input tapes are still in outputTapes.  The
    2063             :      * number of input tapes will not increase between passes.)
    2064             :      */
    2065         334 :     state->memtupsize = state->nOutputTapes;
    2066         668 :     state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
    2067         334 :                                                         state->nOutputTapes * sizeof(SortTuple));
    2068         334 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2069             : 
    2070             :     /*
    2071             :      * Use all the remaining memory we have available for tape buffers among
    2072             :      * all the input tapes.  At the beginning of each merge pass, we will
    2073             :      * divide this memory between the input and output tapes in the pass.
    2074             :      */
    2075         334 :     state->tape_buffer_mem = state->availMem;
    2076         334 :     USEMEM(state, state->tape_buffer_mem);
    2077         334 :     if (trace_sort)
    2078           0 :         elog(LOG, "worker %d using %zu KB of memory for tape buffers",
    2079             :              state->worker, state->tape_buffer_mem / 1024);
    2080             : 
    2081             :     for (;;)
    2082             :     {
    2083             :         /*
    2084             :          * On the first iteration, or if we have read all the runs from the
    2085             :          * input tapes in a multi-pass merge, it's time to start a new pass.
    2086             :          * Rewind all the output tapes, and make them inputs for the next
    2087             :          * pass.
    2088             :          */
    2089         472 :         if (state->nInputRuns == 0)
    2090             :         {
    2091             :             int64       input_buffer_size;
    2092             : 
    2093             :             /* Close the old, emptied, input tapes */
    2094         364 :             if (state->nInputTapes > 0)
    2095             :             {
    2096         210 :                 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2097         180 :                     LogicalTapeClose(state->inputTapes[tapenum]);
    2098          30 :                 pfree(state->inputTapes);
    2099             :             }
    2100             : 
    2101             :             /* Previous pass's outputs become next pass's inputs. */
    2102         364 :             state->inputTapes = state->outputTapes;
    2103         364 :             state->nInputTapes = state->nOutputTapes;
    2104         364 :             state->nInputRuns = state->nOutputRuns;
    2105             : 
    2106             :             /*
    2107             :              * Reset output tape variables.  The actual LogicalTapes will be
    2108             :              * created as needed, here we only allocate the array to hold
    2109             :              * them.
    2110             :              */
    2111         364 :             state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
    2112         364 :             state->nOutputTapes = 0;
    2113         364 :             state->nOutputRuns = 0;
    2114             : 
    2115             :             /*
    2116             :              * Redistribute the memory allocated for tape buffers, among the
    2117             :              * new input and output tapes.
    2118             :              */
    2119         364 :             input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
    2120             :                                                        state->nInputTapes,
    2121             :                                                        state->nInputRuns,
    2122             :                                                        state->maxTapes);
    2123             : 
    2124         364 :             if (trace_sort)
    2125           0 :                 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
    2126             :                      state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
    2127             :                      pg_rusage_show(&state->ru_start));
    2128             : 
    2129             :             /* Prepare the new input tapes for merge pass. */
    2130        1428 :             for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2131        1064 :                 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
    2132             : 
    2133             :             /*
    2134             :              * If there's just one run left on each input tape, then only one
    2135             :              * merge pass remains.  If we don't have to produce a materialized
    2136             :              * sorted tape, we can stop at this point and do the final merge
    2137             :              * on-the-fly.
    2138             :              */
    2139         364 :             if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
    2140         342 :                 && state->nInputRuns <= state->nInputTapes
    2141         312 :                 && !WORKER(state))
    2142             :             {
    2143             :                 /* Tell logtape.c we won't be writing anymore */
    2144         312 :                 LogicalTapeSetForgetFreeSpace(state->tapeset);
    2145             :                 /* Initialize for the final merge pass */
    2146         312 :                 beginmerge(state);
    2147         312 :                 state->status = TSS_FINALMERGE;
    2148         312 :                 return;
    2149             :             }
    2150             :         }
    2151             : 
    2152             :         /* Select an output tape */
    2153         160 :         selectnewtape(state);
    2154             : 
    2155             :         /* Merge one run from each input tape. */
    2156         160 :         mergeonerun(state);
    2157             : 
    2158             :         /*
    2159             :          * If the input tapes are empty, and we output only one output run,
    2160             :          * we're done.  The current output tape contains the final result.
    2161             :          */
    2162         160 :         if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
    2163          22 :             break;
    2164             :     }
    2165             : 
    2166             :     /*
    2167             :      * Done.  The result is on a single run on a single tape.
    2168             :      */
    2169          22 :     state->result_tape = state->outputTapes[0];
    2170          22 :     if (!WORKER(state))
    2171          22 :         LogicalTapeFreeze(state->result_tape, NULL);
    2172             :     else
    2173           0 :         worker_freeze_result_tape(state);
    2174          22 :     state->status = TSS_SORTEDONTAPE;
    2175             : 
    2176             :     /* Close all the now-empty input tapes, to release their read buffers. */
    2177         114 :     for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2178          92 :         LogicalTapeClose(state->inputTapes[tapenum]);
    2179             : }
    2180             : 
    2181             : /*
    2182             :  * Merge one run from each input tape.
    2183             :  */
    2184             : static void
    2185         160 : mergeonerun(Tuplesortstate *state)
    2186             : {
    2187             :     int         srcTapeIndex;
    2188             :     LogicalTape *srcTape;
    2189             : 
    2190             :     /*
    2191             :      * Start the merge by loading one tuple from each active source tape into
    2192             :      * the heap.
    2193             :      */
    2194         160 :     beginmerge(state);
    2195             : 
    2196             :     Assert(state->slabAllocatorUsed);
    2197             : 
    2198             :     /*
    2199             :      * Execute merge by repeatedly extracting lowest tuple in heap, writing it
    2200             :      * out, and replacing it with next tuple from same tape (if there is
    2201             :      * another one).
    2202             :      */
    2203      875592 :     while (state->memtupcount > 0)
    2204             :     {
    2205             :         SortTuple   stup;
    2206             : 
    2207             :         /* write the tuple to destTape */
    2208      875432 :         srcTapeIndex = state->memtuples[0].srctape;
    2209      875432 :         srcTape = state->inputTapes[srcTapeIndex];
    2210      875432 :         WRITETUP(state, state->destTape, &state->memtuples[0]);
    2211             : 
    2212             :         /* recycle the slot of the tuple we just wrote out, for the next read */
    2213      875432 :         if (state->memtuples[0].tuple)
    2214      735348 :             RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
    2215             : 
    2216             :         /*
    2217             :          * pull next tuple from the tape, and replace the written-out tuple in
    2218             :          * the heap with it.
    2219             :          */
    2220      875432 :         if (mergereadnext(state, srcTape, &stup))
    2221             :         {
    2222      874578 :             stup.srctape = srcTapeIndex;
    2223      874578 :             tuplesort_heap_replace_top(state, &stup);
    2224             :         }
    2225             :         else
    2226             :         {
    2227         854 :             tuplesort_heap_delete_top(state);
    2228         854 :             state->nInputRuns--;
    2229             :         }
    2230             :     }
    2231             : 
    2232             :     /*
    2233             :      * When the heap empties, we're done.  Write an end-of-run marker on the
    2234             :      * output tape.
    2235             :      */
    2236         160 :     markrunend(state->destTape);
    2237         160 : }
    2238             : 
    2239             : /*
    2240             :  * beginmerge - initialize for a merge pass
    2241             :  *
    2242             :  * Fill the merge heap with the first tuple from each input tape.
    2243             :  */
    2244             : static void
    2245         472 : beginmerge(Tuplesortstate *state)
    2246             : {
    2247             :     int         activeTapes;
    2248             :     int         srcTapeIndex;
    2249             : 
    2250             :     /* Heap should be empty here */
    2251             :     Assert(state->memtupcount == 0);
    2252             : 
    2253         472 :     activeTapes = Min(state->nInputTapes, state->nInputRuns);
    2254             : 
    2255        2118 :     for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    2256             :     {
    2257             :         SortTuple   tup;
    2258             : 
    2259        1646 :         if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
    2260             :         {
    2261        1332 :             tup.srctape = srcTapeIndex;
    2262        1332 :             tuplesort_heap_insert(state, &tup);
    2263             :         }
    2264             :     }
    2265         472 : }
    2266             : 
    2267             : /*
    2268             :  * mergereadnext - read next tuple from one merge input tape
    2269             :  *
    2270             :  * Returns false on EOF.
    2271             :  */
    2272             : static bool
    2273     5558778 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
    2274             : {
    2275             :     unsigned int tuplen;
    2276             : 
    2277             :     /* read next tuple, if any */
    2278     5558778 :     if ((tuplen = getlen(srcTape, true)) == 0)
    2279        1598 :         return false;
    2280     5557180 :     READTUP(state, stup, srcTape, tuplen);
    2281             : 
    2282     5557180 :     return true;
    2283             : }
    2284             : 
    2285             : /*
    2286             :  * dumptuples - remove tuples from memtuples and write initial run to tape
    2287             :  *
    2288             :  * When alltuples = true, dump everything currently in memory.  (This case is
    2289             :  * only used at end of input data.)
    2290             :  */
    2291             : static void
    2292     1113130 : dumptuples(Tuplesortstate *state, bool alltuples)
    2293             : {
    2294             :     int         memtupwrite;
    2295             :     int         i;
    2296             : 
    2297             :     /*
    2298             :      * Nothing to do if we still fit in available memory and have array slots,
    2299             :      * unless this is the final call during initial run generation.
    2300             :      */
    2301     1113130 :     if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
    2302     1112186 :         !alltuples)
    2303     1111482 :         return;
    2304             : 
    2305             :     /*
    2306             :      * Final call might require no sorting, in rare cases where we just so
    2307             :      * happen to have previously LACKMEM()'d at the point where exactly all
    2308             :      * remaining tuples are loaded into memory, just before input was
    2309             :      * exhausted.  In general, short final runs are quite possible, but avoid
    2310             :      * creating a completely empty run.  In a worker, though, we must produce
    2311             :      * at least one tape, even if it's empty.
    2312             :      */
    2313        1648 :     if (state->memtupcount == 0 && state->currentRun > 0)
    2314           0 :         return;
    2315             : 
    2316             :     Assert(state->status == TSS_BUILDRUNS);
    2317             : 
    2318             :     /*
    2319             :      * It seems unlikely that this limit will ever be exceeded, but take no
    2320             :      * chances
    2321             :      */
    2322        1648 :     if (state->currentRun == INT_MAX)
    2323           0 :         ereport(ERROR,
    2324             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2325             :                  errmsg("cannot have more than %d runs for an external sort",
    2326             :                         INT_MAX)));
    2327             : 
    2328        1648 :     if (state->currentRun > 0)
    2329         944 :         selectnewtape(state);
    2330             : 
    2331        1648 :     state->currentRun++;
    2332             : 
    2333        1648 :     if (trace_sort)
    2334           0 :         elog(LOG, "worker %d starting quicksort of run %d: %s",
    2335             :              state->worker, state->currentRun,
    2336             :              pg_rusage_show(&state->ru_start));
    2337             : 
    2338             :     /*
    2339             :      * Sort all tuples accumulated within the allowed amount of memory for
    2340             :      * this run using quicksort
    2341             :      */
    2342        1648 :     tuplesort_sort_memtuples(state);
    2343             : 
    2344        1648 :     if (trace_sort)
    2345           0 :         elog(LOG, "worker %d finished quicksort of run %d: %s",
    2346             :              state->worker, state->currentRun,
    2347             :              pg_rusage_show(&state->ru_start));
    2348             : 
    2349        1648 :     memtupwrite = state->memtupcount;
    2350     5198600 :     for (i = 0; i < memtupwrite; i++)
    2351             :     {
    2352     5196952 :         SortTuple  *stup = &state->memtuples[i];
    2353             : 
    2354     5196952 :         WRITETUP(state, state->destTape, stup);
    2355             :     }
    2356             : 
    2357        1648 :     state->memtupcount = 0;
    2358             : 
    2359             :     /*
    2360             :      * Reset tuple memory.  We've freed all of the tuples that we previously
    2361             :      * allocated.  It's important to avoid fragmentation when there is a stark
    2362             :      * change in the sizes of incoming tuples.  In bounded sorts,
    2363             :      * fragmentation due to AllocSetFree's bucketing by size class might be
    2364             :      * particularly bad if this step wasn't taken.
    2365             :      */
    2366        1648 :     MemoryContextReset(state->base.tuplecontext);
    2367             : 
    2368             :     /*
    2369             :      * Now update the memory accounting to subtract the memory used by the
    2370             :      * tuple.
    2371             :      */
    2372        1648 :     FREEMEM(state, state->tupleMem);
    2373        1648 :     state->tupleMem = 0;
    2374             : 
    2375        1648 :     markrunend(state->destTape);
    2376             : 
    2377        1648 :     if (trace_sort)
    2378           0 :         elog(LOG, "worker %d finished writing run %d to tape %d: %s",
    2379             :              state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
    2380             :              pg_rusage_show(&state->ru_start));
    2381             : }
    2382             : 
    2383             : /*
    2384             :  * tuplesort_rescan     - rewind and replay the scan
    2385             :  */
    2386             : void
    2387          60 : tuplesort_rescan(Tuplesortstate *state)
    2388             : {
    2389          60 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2390             : 
    2391             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2392             : 
    2393          60 :     switch (state->status)
    2394             :     {
    2395          52 :         case TSS_SORTEDINMEM:
    2396          52 :             state->current = 0;
    2397          52 :             state->eof_reached = false;
    2398          52 :             state->markpos_offset = 0;
    2399          52 :             state->markpos_eof = false;
    2400          52 :             break;
    2401           8 :         case TSS_SORTEDONTAPE:
    2402           8 :             LogicalTapeRewindForRead(state->result_tape, 0);
    2403           8 :             state->eof_reached = false;
    2404           8 :             state->markpos_block = 0L;
    2405           8 :             state->markpos_offset = 0;
    2406           8 :             state->markpos_eof = false;
    2407           8 :             break;
    2408           0 :         default:
    2409           0 :             elog(ERROR, "invalid tuplesort state");
    2410             :             break;
    2411             :     }
    2412             : 
    2413          60 :     MemoryContextSwitchTo(oldcontext);
    2414          60 : }
    2415             : 
    2416             : /*
    2417             :  * tuplesort_markpos    - saves current position in the merged sort file
    2418             :  */
    2419             : void
    2420      584210 : tuplesort_markpos(Tuplesortstate *state)
    2421             : {
    2422      584210 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2423             : 
    2424             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2425             : 
    2426      584210 :     switch (state->status)
    2427             :     {
    2428      575402 :         case TSS_SORTEDINMEM:
    2429      575402 :             state->markpos_offset = state->current;
    2430      575402 :             state->markpos_eof = state->eof_reached;
    2431      575402 :             break;
    2432        8808 :         case TSS_SORTEDONTAPE:
    2433        8808 :             LogicalTapeTell(state->result_tape,
    2434             :                             &state->markpos_block,
    2435             :                             &state->markpos_offset);
    2436        8808 :             state->markpos_eof = state->eof_reached;
    2437        8808 :             break;
    2438           0 :         default:
    2439           0 :             elog(ERROR, "invalid tuplesort state");
    2440             :             break;
    2441             :     }
    2442             : 
    2443      584210 :     MemoryContextSwitchTo(oldcontext);
    2444      584210 : }
    2445             : 
    2446             : /*
    2447             :  * tuplesort_restorepos - restores current position in merged sort file to
    2448             :  *                        last saved position
    2449             :  */
    2450             : void
    2451       38722 : tuplesort_restorepos(Tuplesortstate *state)
    2452             : {
    2453       38722 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2454             : 
    2455             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2456             : 
    2457       38722 :     switch (state->status)
    2458             :     {
    2459       32530 :         case TSS_SORTEDINMEM:
    2460       32530 :             state->current = state->markpos_offset;
    2461       32530 :             state->eof_reached = state->markpos_eof;
    2462       32530 :             break;
    2463        6192 :         case TSS_SORTEDONTAPE:
    2464        6192 :             LogicalTapeSeek(state->result_tape,
    2465             :                             state->markpos_block,
    2466             :                             state->markpos_offset);
    2467        6192 :             state->eof_reached = state->markpos_eof;
    2468        6192 :             break;
    2469           0 :         default:
    2470           0 :             elog(ERROR, "invalid tuplesort state");
    2471             :             break;
    2472             :     }
    2473             : 
    2474       38722 :     MemoryContextSwitchTo(oldcontext);
    2475       38722 : }
    2476             : 
    2477             : /*
    2478             :  * tuplesort_get_stats - extract summary statistics
    2479             :  *
    2480             :  * This can be called after tuplesort_performsort() finishes to obtain
    2481             :  * printable summary information about how the sort was performed.
    2482             :  */
    2483             : void
    2484         396 : tuplesort_get_stats(Tuplesortstate *state,
    2485             :                     TuplesortInstrumentation *stats)
    2486             : {
    2487             :     /*
    2488             :      * Note: it might seem we should provide both memory and disk usage for a
    2489             :      * disk-based sort.  However, the current code doesn't track memory space
    2490             :      * accurately once we have begun to return tuples to the caller (since we
    2491             :      * don't account for pfree's the caller is expected to do), so we cannot
    2492             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    2493             :      * to fix.  Is it worth creating an API for the memory context code to
    2494             :      * tell us how much is actually used in sortcontext?
    2495             :      */
    2496         396 :     tuplesort_updatemax(state);
    2497             : 
    2498         396 :     if (state->isMaxSpaceDisk)
    2499           6 :         stats->spaceType = SORT_SPACE_TYPE_DISK;
    2500             :     else
    2501         390 :         stats->spaceType = SORT_SPACE_TYPE_MEMORY;
    2502         396 :     stats->spaceUsed = (state->maxSpace + 1023) / 1024;
    2503             : 
    2504         396 :     switch (state->maxSpaceStatus)
    2505             :     {
    2506         390 :         case TSS_SORTEDINMEM:
    2507         390 :             if (state->boundUsed)
    2508          42 :                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
    2509             :             else
    2510         348 :                 stats->sortMethod = SORT_TYPE_QUICKSORT;
    2511         390 :             break;
    2512           0 :         case TSS_SORTEDONTAPE:
    2513           0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
    2514           0 :             break;
    2515           6 :         case TSS_FINALMERGE:
    2516           6 :             stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
    2517           6 :             break;
    2518           0 :         default:
    2519           0 :             stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
    2520           0 :             break;
    2521             :     }
    2522         396 : }
    2523             : 
    2524             : /*
    2525             :  * Convert TuplesortMethod to a string.
    2526             :  */
    2527             : const char *
    2528         294 : tuplesort_method_name(TuplesortMethod m)
    2529             : {
    2530         294 :     switch (m)
    2531             :     {
    2532           0 :         case SORT_TYPE_STILL_IN_PROGRESS:
    2533           0 :             return "still in progress";
    2534          42 :         case SORT_TYPE_TOP_N_HEAPSORT:
    2535          42 :             return "top-N heapsort";
    2536         246 :         case SORT_TYPE_QUICKSORT:
    2537         246 :             return "quicksort";
    2538           0 :         case SORT_TYPE_EXTERNAL_SORT:
    2539           0 :             return "external sort";
    2540           6 :         case SORT_TYPE_EXTERNAL_MERGE:
    2541           6 :             return "external merge";
    2542             :     }
    2543             : 
    2544           0 :     return "unknown";
    2545             : }
    2546             : 
    2547             : /*
    2548             :  * Convert TuplesortSpaceType to a string.
    2549             :  */
    2550             : const char *
    2551         258 : tuplesort_space_type_name(TuplesortSpaceType t)
    2552             : {
    2553             :     Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
    2554         258 :     return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
    2555             : }
    2556             : 
    2557             : 
    2558             : /*
    2559             :  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
    2560             :  */
    2561             : 
    2562             : /*
    2563             :  * Convert the existing unordered array of SortTuples to a bounded heap,
    2564             :  * discarding all but the smallest "state->bound" tuples.
    2565             :  *
    2566             :  * When working with a bounded heap, we want to keep the largest entry
    2567             :  * at the root (array entry zero), instead of the smallest as in the normal
    2568             :  * sort case.  This allows us to discard the largest entry cheaply.
    2569             :  * Therefore, we temporarily reverse the sort direction.
    2570             :  */
    2571             : static void
    2572         408 : make_bounded_heap(Tuplesortstate *state)
    2573             : {
    2574         408 :     int         tupcount = state->memtupcount;
    2575             :     int         i;
    2576             : 
    2577             :     Assert(state->status == TSS_INITIAL);
    2578             :     Assert(state->bounded);
    2579             :     Assert(tupcount >= state->bound);
    2580             :     Assert(SERIAL(state));
    2581             : 
    2582             :     /* Reverse sort direction so largest entry will be at root */
    2583         408 :     reversedirection(state);
    2584             : 
    2585         408 :     state->memtupcount = 0;      /* make the heap empty */
    2586       37980 :     for (i = 0; i < tupcount; i++)
    2587             :     {
    2588       37572 :         if (state->memtupcount < state->bound)
    2589             :         {
    2590             :             /* Insert next tuple into heap */
    2591             :             /* Must copy source tuple to avoid possible overwrite */
    2592       18582 :             SortTuple   stup = state->memtuples[i];
    2593             : 
    2594       18582 :             tuplesort_heap_insert(state, &stup);
    2595             :         }
    2596             :         else
    2597             :         {
    2598             :             /*
    2599             :              * The heap is full.  Replace the largest entry with the new
    2600             :              * tuple, or just discard it, if it's larger than anything already
    2601             :              * in the heap.
    2602             :              */
    2603       18990 :             if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
    2604             :             {
    2605        9800 :                 free_sort_tuple(state, &state->memtuples[i]);
    2606        9800 :                 CHECK_FOR_INTERRUPTS();
    2607             :             }
    2608             :             else
    2609        9190 :                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
    2610             :         }
    2611             :     }
    2612             : 
    2613             :     Assert(state->memtupcount == state->bound);
    2614         408 :     state->status = TSS_BOUNDED;
    2615         408 : }
    2616             : 
    2617             : /*
    2618             :  * Convert the bounded heap to a properly-sorted array
    2619             :  */
    2620             : static void
    2621         408 : sort_bounded_heap(Tuplesortstate *state)
    2622             : {
    2623         408 :     int         tupcount = state->memtupcount;
    2624             : 
    2625             :     Assert(state->status == TSS_BOUNDED);
    2626             :     Assert(state->bounded);
    2627             :     Assert(tupcount == state->bound);
    2628             :     Assert(SERIAL(state));
    2629             : 
    2630             :     /*
    2631             :      * We can unheapify in place because each delete-top call will remove the
    2632             :      * largest entry, which we can promptly store in the newly freed slot at
    2633             :      * the end.  Once we're down to a single-entry heap, we're done.
    2634             :      */
    2635       18582 :     while (state->memtupcount > 1)
    2636             :     {
    2637       18174 :         SortTuple   stup = state->memtuples[0];
    2638             : 
    2639             :         /* this sifts-up the next-largest entry and decreases memtupcount */
    2640       18174 :         tuplesort_heap_delete_top(state);
    2641       18174 :         state->memtuples[state->memtupcount] = stup;
    2642             :     }
    2643         408 :     state->memtupcount = tupcount;
    2644             : 
    2645             :     /*
    2646             :      * Reverse sort direction back to the original state.  This is not
    2647             :      * actually necessary but seems like a good idea for tidiness.
    2648             :      */
    2649         408 :     reversedirection(state);
    2650             : 
    2651         408 :     state->status = TSS_SORTEDINMEM;
    2652         408 :     state->boundUsed = true;
    2653         408 : }
    2654             : 
    2655             : /*
    2656             :  * Sort all memtuples using specialized qsort() routines.
    2657             :  *
    2658             :  * Quicksort is used for small in-memory sorts, and external sort runs.
    2659             :  */
    2660             : static void
    2661      239646 : tuplesort_sort_memtuples(Tuplesortstate *state)
    2662             : {
    2663             :     Assert(!LEADER(state));
    2664             : 
    2665      239646 :     if (state->memtupcount > 1)
    2666             :     {
    2667             :         /*
    2668             :          * Do we have the leading column's value or abbreviation in datum1,
    2669             :          * and is there a specialization for its comparator?
    2670             :          */
    2671       68782 :         if (state->base.haveDatum1 && state->base.sortKeys)
    2672             :         {
    2673       68670 :             if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
    2674             :             {
    2675        3538 :                 qsort_tuple_unsigned(state->memtuples,
    2676        3538 :                                      state->memtupcount,
    2677             :                                      state);
    2678        3522 :                 return;
    2679             :             }
    2680       65132 :             else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
    2681             :             {
    2682        1386 :                 qsort_tuple_signed(state->memtuples,
    2683        1386 :                                    state->memtupcount,
    2684             :                                    state);
    2685        1386 :                 return;
    2686             :             }
    2687       63746 :             else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
    2688             :             {
    2689       39928 :                 qsort_tuple_int32(state->memtuples,
    2690       39928 :                                   state->memtupcount,
    2691             :                                   state);
    2692       39868 :                 return;
    2693             :             }
    2694             :         }
    2695             : 
    2696             :         /* Can we use the single-key sort function? */
    2697       23930 :         if (state->base.onlyKey != NULL)
    2698             :         {
    2699       10246 :             qsort_ssup(state->memtuples, state->memtupcount,
    2700       10246 :                        state->base.onlyKey);
    2701             :         }
    2702             :         else
    2703             :         {
    2704       13684 :             qsort_tuple(state->memtuples,
    2705       13684 :                         state->memtupcount,
    2706             :                         state->base.comparetup,
    2707             :                         state);
    2708             :         }
    2709             :     }
    2710             : }
    2711             : 
    2712             : /*
    2713             :  * Insert a new tuple into an empty or existing heap, maintaining the
    2714             :  * heap invariant.  Caller is responsible for ensuring there's room.
    2715             :  *
    2716             :  * Note: For some callers, tuple points to a memtuples[] entry above the
    2717             :  * end of the heap.  This is safe as long as it's not immediately adjacent
    2718             :  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
    2719             :  * is, it might get overwritten before being moved into the heap!
    2720             :  */
    2721             : static void
    2722       19914 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
    2723             : {
    2724             :     SortTuple  *memtuples;
    2725             :     int         j;
    2726             : 
    2727       19914 :     memtuples = state->memtuples;
    2728             :     Assert(state->memtupcount < state->memtupsize);
    2729             : 
    2730       19914 :     CHECK_FOR_INTERRUPTS();
    2731             : 
    2732             :     /*
    2733             :      * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
    2734             :      * using 1-based array indexes, not 0-based.
    2735             :      */
    2736       19914 :     j = state->memtupcount++;
    2737       57792 :     while (j > 0)
    2738             :     {
    2739       50952 :         int         i = (j - 1) >> 1;
    2740             : 
    2741       50952 :         if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
    2742       13074 :             break;
    2743       37878 :         memtuples[j] = memtuples[i];
    2744       37878 :         j = i;
    2745             :     }
    2746       19914 :     memtuples[j] = *tuple;
    2747       19914 : }
    2748             : 
    2749             : /*
    2750             :  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
    2751             :  * memtupcount, and sift up to maintain the heap invariant.
    2752             :  *
    2753             :  * The caller has already free'd the tuple the top node points to,
    2754             :  * if necessary.
    2755             :  */
    2756             : static void
    2757       19458 : tuplesort_heap_delete_top(Tuplesortstate *state)
    2758             : {
    2759       19458 :     SortTuple  *memtuples = state->memtuples;
    2760             :     SortTuple  *tuple;
    2761             : 
    2762       19458 :     if (--state->memtupcount <= 0)
    2763         332 :         return;
    2764             : 
    2765             :     /*
    2766             :      * Remove the last tuple in the heap, and re-insert it, by replacing the
    2767             :      * current top node with it.
    2768             :      */
    2769       19126 :     tuple = &memtuples[state->memtupcount];
    2770       19126 :     tuplesort_heap_replace_top(state, tuple);
    2771             : }
    2772             : 
    2773             : /*
    2774             :  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
    2775             :  * maintain the heap invariant.
    2776             :  *
    2777             :  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
    2778             :  * Heapsort, steps H3-H8).
    2779             :  */
    2780             : static void
    2781     6087150 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
    2782             : {
    2783     6087150 :     SortTuple  *memtuples = state->memtuples;
    2784             :     unsigned int i,
    2785             :                 n;
    2786             : 
    2787             :     Assert(state->memtupcount >= 1);
    2788             : 
    2789     6087150 :     CHECK_FOR_INTERRUPTS();
    2790             : 
    2791             :     /*
    2792             :      * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
    2793             :      * This prevents overflow in the "2 * i + 1" calculation, since at the top
    2794             :      * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
    2795             :      */
    2796     6087150 :     n = state->memtupcount;
    2797     6087150 :     i = 0;                      /* i is where the "hole" is */
    2798             :     for (;;)
    2799     1778286 :     {
    2800     7865436 :         unsigned int j = 2 * i + 1;
    2801             : 
    2802     7865436 :         if (j >= n)
    2803     1238184 :             break;
    2804     9076228 :         if (j + 1 < n &&
    2805     2448976 :             COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
    2806      967166 :             j++;
    2807     6627252 :         if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
    2808     4848966 :             break;
    2809     1778286 :         memtuples[i] = memtuples[j];
    2810     1778286 :         i = j;
    2811             :     }
    2812     6087150 :     memtuples[i] = *tuple;
    2813     6087150 : }
    2814             : 
    2815             : /*
    2816             :  * Function to reverse the sort direction from its current state
    2817             :  *
    2818             :  * It is not safe to call this when performing hash tuplesorts
    2819             :  */
    2820             : static void
    2821         816 : reversedirection(Tuplesortstate *state)
    2822             : {
    2823         816 :     SortSupport sortKey = state->base.sortKeys;
    2824             :     int         nkey;
    2825             : 
    2826        1992 :     for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
    2827             :     {
    2828        1176 :         sortKey->ssup_reverse = !sortKey->ssup_reverse;
    2829        1176 :         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    2830             :     }
    2831         816 : }
    2832             : 
    2833             : 
    2834             : /*
    2835             :  * Tape interface routines
    2836             :  */
    2837             : 
    2838             : static unsigned int
    2839     5861790 : getlen(LogicalTape *tape, bool eofOK)
    2840             : {
    2841             :     unsigned int len;
    2842             : 
    2843     5861790 :     if (LogicalTapeRead(tape,
    2844             :                         &len, sizeof(len)) != sizeof(len))
    2845           0 :         elog(ERROR, "unexpected end of tape");
    2846     5861790 :     if (len == 0 && !eofOK)
    2847           0 :         elog(ERROR, "unexpected end of data");
    2848     5861790 :     return len;
    2849             : }
    2850             : 
    2851             : static void
    2852        1808 : markrunend(LogicalTape *tape)
    2853             : {
    2854        1808 :     unsigned int len = 0;
    2855             : 
    2856        1808 :     LogicalTapeWrite(tape, &len, sizeof(len));
    2857        1808 : }
    2858             : 
    2859             : /*
    2860             :  * Get memory for tuple from within READTUP() routine.
    2861             :  *
    2862             :  * We use next free slot from the slab allocator, or palloc() if the tuple
    2863             :  * is too large for that.
    2864             :  */
    2865             : void *
    2866     5449916 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
    2867             : {
    2868             :     SlabSlot   *buf;
    2869             : 
    2870             :     /*
    2871             :      * We pre-allocate enough slots in the slab arena that we should never run
    2872             :      * out.
    2873             :      */
    2874             :     Assert(state->slabFreeHead);
    2875             : 
    2876     5449916 :     if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
    2877           6 :         return MemoryContextAlloc(state->base.sortcontext, tuplen);
    2878             :     else
    2879             :     {
    2880     5449910 :         buf = state->slabFreeHead;
    2881             :         /* Reuse this slot */
    2882     5449910 :         state->slabFreeHead = buf->nextfree;
    2883             : 
    2884     5449910 :         return buf;
    2885             :     }
    2886             : }
    2887             : 
    2888             : 
    2889             : /*
    2890             :  * Parallel sort routines
    2891             :  */
    2892             : 
    2893             : /*
    2894             :  * tuplesort_estimate_shared - estimate required shared memory allocation
    2895             :  *
    2896             :  * nWorkers is an estimate of the number of workers (it's the number that
    2897             :  * will be requested).
    2898             :  */
    2899             : Size
    2900         196 : tuplesort_estimate_shared(int nWorkers)
    2901             : {
    2902             :     Size        tapesSize;
    2903             : 
    2904             :     Assert(nWorkers > 0);
    2905             : 
    2906             :     /* Make sure that BufFile shared state is MAXALIGN'd */
    2907         196 :     tapesSize = mul_size(sizeof(TapeShare), nWorkers);
    2908         196 :     tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
    2909             : 
    2910         196 :     return tapesSize;
    2911             : }
    2912             : 
    2913             : /*
    2914             :  * tuplesort_initialize_shared - initialize shared tuplesort state
    2915             :  *
    2916             :  * Must be called from leader process before workers are launched, to
    2917             :  * establish state needed up-front for worker tuplesortstates.  nWorkers
    2918             :  * should match the argument passed to tuplesort_estimate_shared().
    2919             :  */
    2920             : void
    2921         266 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
    2922             : {
    2923             :     int         i;
    2924             : 
    2925             :     Assert(nWorkers > 0);
    2926             : 
    2927         266 :     SpinLockInit(&shared->mutex);
    2928         266 :     shared->currentWorker = 0;
    2929         266 :     shared->workersFinished = 0;
    2930         266 :     SharedFileSetInit(&shared->fileset, seg);
    2931         266 :     shared->nTapes = nWorkers;
    2932         838 :     for (i = 0; i < nWorkers; i++)
    2933             :     {
    2934         572 :         shared->tapes[i].firstblocknumber = 0L;
    2935             :     }
    2936         266 : }
    2937             : 
    2938             : /*
    2939             :  * tuplesort_attach_shared - attach to shared tuplesort state
    2940             :  *
    2941             :  * Must be called by all worker processes.
    2942             :  */
    2943             : void
    2944         300 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
    2945             : {
    2946             :     /* Attach to SharedFileSet */
    2947         300 :     SharedFileSetAttach(&shared->fileset, seg);
    2948         300 : }
    2949             : 
    2950             : /*
    2951             :  * worker_get_identifier - Assign and return ordinal identifier for worker
    2952             :  *
    2953             :  * The order in which these are assigned is not well defined, and should not
    2954             :  * matter; worker numbers across parallel sort participants need only be
    2955             :  * distinct and gapless.  logtape.c requires this.
    2956             :  *
    2957             :  * Note that the identifiers assigned from here have no relation to
    2958             :  * ParallelWorkerNumber number, to avoid making any assumption about
    2959             :  * caller's requirements.  However, we do follow the ParallelWorkerNumber
    2960             :  * convention of representing a non-worker with worker number -1.  This
    2961             :  * includes the leader, as well as serial Tuplesort processes.
    2962             :  */
    2963             : static int
    2964         564 : worker_get_identifier(Tuplesortstate *state)
    2965             : {
    2966         564 :     Sharedsort *shared = state->shared;
    2967             :     int         worker;
    2968             : 
    2969             :     Assert(WORKER(state));
    2970             : 
    2971         564 :     SpinLockAcquire(&shared->mutex);
    2972         564 :     worker = shared->currentWorker++;
    2973         564 :     SpinLockRelease(&shared->mutex);
    2974             : 
    2975         564 :     return worker;
    2976             : }
    2977             : 
    2978             : /*
    2979             :  * worker_freeze_result_tape - freeze worker's result tape for leader
    2980             :  *
    2981             :  * This is called by workers just after the result tape has been determined,
    2982             :  * instead of calling LogicalTapeFreeze() directly.  They do so because
    2983             :  * workers require a few additional steps over similar serial
    2984             :  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
    2985             :  * steps are around freeing now unneeded resources, and representing to
    2986             :  * leader that worker's input run is available for its merge.
    2987             :  *
    2988             :  * There should only be one final output run for each worker, which consists
    2989             :  * of all tuples that were originally input into worker.
    2990             :  */
    2991             : static void
    2992         564 : worker_freeze_result_tape(Tuplesortstate *state)
    2993             : {
    2994         564 :     Sharedsort *shared = state->shared;
    2995             :     TapeShare   output;
    2996             : 
    2997             :     Assert(WORKER(state));
    2998             :     Assert(state->result_tape != NULL);
    2999             :     Assert(state->memtupcount == 0);
    3000             : 
    3001             :     /*
    3002             :      * Free most remaining memory, in case caller is sensitive to our holding
    3003             :      * on to it.  memtuples may not be a tiny merge heap at this point.
    3004             :      */
    3005         564 :     pfree(state->memtuples);
    3006             :     /* Be tidy */
    3007         564 :     state->memtuples = NULL;
    3008         564 :     state->memtupsize = 0;
    3009             : 
    3010             :     /*
    3011             :      * Parallel worker requires result tape metadata, which is to be stored in
    3012             :      * shared memory for leader
    3013             :      */
    3014         564 :     LogicalTapeFreeze(state->result_tape, &output);
    3015             : 
    3016             :     /* Store properties of output tape, and update finished worker count */
    3017         564 :     SpinLockAcquire(&shared->mutex);
    3018         564 :     shared->tapes[state->worker] = output;
    3019         564 :     shared->workersFinished++;
    3020         564 :     SpinLockRelease(&shared->mutex);
    3021         564 : }
    3022             : 
    3023             : /*
    3024             :  * worker_nomergeruns - dump memtuples in worker, without merging
    3025             :  *
    3026             :  * This called as an alternative to mergeruns() with a worker when no
    3027             :  * merging is required.
    3028             :  */
    3029             : static void
    3030         564 : worker_nomergeruns(Tuplesortstate *state)
    3031             : {
    3032             :     Assert(WORKER(state));
    3033             :     Assert(state->result_tape == NULL);
    3034             :     Assert(state->nOutputRuns == 1);
    3035             : 
    3036         564 :     state->result_tape = state->destTape;
    3037         564 :     worker_freeze_result_tape(state);
    3038         564 : }
    3039             : 
    3040             : /*
    3041             :  * leader_takeover_tapes - create tapeset for leader from worker tapes
    3042             :  *
    3043             :  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
    3044             :  * sorting has occurred in workers, all of which must have already returned
    3045             :  * from tuplesort_performsort().
    3046             :  *
    3047             :  * When this returns, leader process is left in a state that is virtually
    3048             :  * indistinguishable from it having generated runs as a serial external sort
    3049             :  * might have.
    3050             :  */
    3051             : static void
    3052         194 : leader_takeover_tapes(Tuplesortstate *state)
    3053             : {
    3054         194 :     Sharedsort *shared = state->shared;
    3055         194 :     int         nParticipants = state->nParticipants;
    3056             :     int         workersFinished;
    3057             :     int         j;
    3058             : 
    3059             :     Assert(LEADER(state));
    3060             :     Assert(nParticipants >= 1);
    3061             : 
    3062         194 :     SpinLockAcquire(&shared->mutex);
    3063         194 :     workersFinished = shared->workersFinished;
    3064         194 :     SpinLockRelease(&shared->mutex);
    3065             : 
    3066         194 :     if (nParticipants != workersFinished)
    3067           0 :         elog(ERROR, "cannot take over tapes before all workers finish");
    3068             : 
    3069             :     /*
    3070             :      * Create the tapeset from worker tapes, including a leader-owned tape at
    3071             :      * the end.  Parallel workers are far more expensive than logical tapes,
    3072             :      * so the number of tapes allocated here should never be excessive.
    3073             :      */
    3074         194 :     inittapestate(state, nParticipants);
    3075         194 :     state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
    3076             : 
    3077             :     /*
    3078             :      * Set currentRun to reflect the number of runs we will merge (it's not
    3079             :      * used for anything, this is just pro forma)
    3080             :      */
    3081         194 :     state->currentRun = nParticipants;
    3082             : 
    3083             :     /*
    3084             :      * Initialize the state to look the same as after building the initial
    3085             :      * runs.
    3086             :      *
    3087             :      * There will always be exactly 1 run per worker, and exactly one input
    3088             :      * tape per run, because workers always output exactly 1 run, even when
    3089             :      * there were no input tuples for workers to sort.
    3090             :      */
    3091         194 :     state->inputTapes = NULL;
    3092         194 :     state->nInputTapes = 0;
    3093         194 :     state->nInputRuns = 0;
    3094             : 
    3095         194 :     state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
    3096         194 :     state->nOutputTapes = nParticipants;
    3097         194 :     state->nOutputRuns = nParticipants;
    3098             : 
    3099         618 :     for (j = 0; j < nParticipants; j++)
    3100             :     {
    3101         424 :         state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    3102             :     }
    3103             : 
    3104         194 :     state->status = TSS_BUILDRUNS;
    3105         194 : }
    3106             : 
    3107             : /*
    3108             :  * Convenience routine to free a tuple previously loaded into sort memory
    3109             :  */
    3110             : static void
    3111     3759980 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
    3112             : {
    3113     3759980 :     if (stup->tuple)
    3114             :     {
    3115     3599702 :         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    3116     3599702 :         pfree(stup->tuple);
    3117     3599702 :         stup->tuple = NULL;
    3118             :     }
    3119     3759980 : }
    3120             : 
    3121             : int
    3122           0 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
    3123             : {
    3124           0 :     if (x < y)
    3125           0 :         return -1;
    3126           0 :     else if (x > y)
    3127           0 :         return 1;
    3128             :     else
    3129           0 :         return 0;
    3130             : }
    3131             : 
    3132             : int
    3133     1143736 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
    3134             : {
    3135     1143736 :     int64       xx = DatumGetInt64(x);
    3136     1143736 :     int64       yy = DatumGetInt64(y);
    3137             : 
    3138     1143736 :     if (xx < yy)
    3139      425186 :         return -1;
    3140      718550 :     else if (xx > yy)
    3141      358896 :         return 1;
    3142             :     else
    3143      359654 :         return 0;
    3144             : }
    3145             : 
    3146             : int
    3147   191136420 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
    3148             : {
    3149   191136420 :     int32       xx = DatumGetInt32(x);
    3150   191136420 :     int32       yy = DatumGetInt32(y);
    3151             : 
    3152   191136420 :     if (xx < yy)
    3153    46327704 :         return -1;
    3154   144808716 :     else if (xx > yy)
    3155    43622344 :         return 1;
    3156             :     else
    3157   101186372 :         return 0;
    3158             : }

Generated by: LCOV version 1.16