LCOV - code coverage report
Current view: top level - src/backend/utils/sort - tuplesort.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 727 812 89.5 %
Date: 2025-12-12 12:18:28 Functions: 54 55 98.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * tuplesort.c
       4             :  *    Generalized tuple sorting routines.
       5             :  *
       6             :  * This module provides a generalized facility for tuple sorting, which can be
       7             :  * applied to different kinds of sortable objects.  Implementation of
       8             :  * the particular sorting variants is given in tuplesortvariants.c.
       9             :  * This module works efficiently for both small and large amounts
      10             :  * of data.  Small amounts are sorted in-memory using qsort().  Large
      11             :  * amounts are sorted using temporary files and a standard external sort
      12             :  * algorithm.
      13             :  *
      14             :  * See Knuth, volume 3, for more than you want to know about external
      15             :  * sorting algorithms.  The algorithm we use is a balanced k-way merge.
      16             :  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
      17             :  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
      18             :  * merge is better.  Knuth is assuming that tape drives are expensive
      19             :  * beasts, and in particular that there will always be many more runs than
      20             :  * tape drives.  The polyphase merge algorithm was good at keeping all the
      21             :  * tape drives busy, but in our implementation a "tape drive" doesn't cost
      22             :  * much more than a few Kb of memory buffers, so we can afford to have
      23             :  * lots of them.  In particular, if we can have as many tape drives as
      24             :  * sorted runs, we can eliminate any repeated I/O at all.
      25             :  *
      26             :  * Historically, we divided the input into sorted runs using replacement
      27             :  * selection, in the form of a priority tree implemented as a heap
      28             :  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
      29             :  * for run generation.
      30             :  *
      31             :  * The approximate amount of memory allowed for any one sort operation
      32             :  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
      33             :  * we absorb tuples and simply store them in an unsorted array as long as
      34             :  * we haven't exceeded workMem.  If we reach the end of the input without
      35             :  * exceeding workMem, we sort the array using qsort() and subsequently return
      36             :  * tuples just by scanning the tuple array sequentially.  If we do exceed
      37             :  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
      38             :  * When tuples are dumped in batch after quicksorting, we begin a new run
      39             :  * with a new output tape.  If we reach the max number of tapes, we write
      40             :  * subsequent runs on the existing tapes in a round-robin fashion.  We will
      41             :  * need multiple merge passes to finish the merge in that case.  After the
      42             :  * end of the input is reached, we dump out remaining tuples in memory into
      43             :  * a final run, then merge the runs.
      44             :  *
      45             :  * When merging runs, we use a heap containing just the frontmost tuple from
      46             :  * each source run; we repeatedly output the smallest tuple and replace it
      47             :  * with the next tuple from its source tape (if any).  When the heap empties,
      48             :  * the merge is complete.  The basic merge algorithm thus needs very little
      49             :  * memory --- only M tuples for an M-way merge, and M is constrained to a
      50             :  * small number.  However, we can still make good use of our full workMem
      51             :  * allocation by pre-reading additional blocks from each source tape.  Without
      52             :  * prereading, our access pattern to the temporary file would be very erratic;
      53             :  * on average we'd read one block from each of M source tapes during the same
      54             :  * time that we're writing M blocks to the output tape, so there is no
      55             :  * sequentiality of access at all, defeating the read-ahead methods used by
      56             :  * most Unix kernels.  Worse, the output tape gets written into a very random
      57             :  * sequence of blocks of the temp file, ensuring that things will be even
      58             :  * worse when it comes time to read that tape.  A straightforward merge pass
      59             :  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
      60             :  * by prereading from each source tape sequentially, loading about workMem/M
      61             :  * bytes from each tape in turn, and making the sequential blocks immediately
      62             :  * available for reuse.  This approach helps to localize both read and write
      63             :  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
      64             :  * much memory to use for the buffers.
      65             :  *
      66             :  * In the current code we determine the number of input tapes M on the basis
      67             :  * of workMem: we want workMem/M to be large enough that we read a fair
      68             :  * amount of data each time we read from a tape, so as to maintain the
      69             :  * locality of access described above.  Nonetheless, with large workMem we
      70             :  * can have many tapes.  The logical "tapes" are implemented by logtape.c,
      71             :  * which avoids space wastage by recycling disk space as soon as each block
      72             :  * is read from its "tape".
      73             :  *
      74             :  * When the caller requests random access to the sort result, we form
      75             :  * the final sorted run on a logical tape which is then "frozen", so
      76             :  * that we can access it randomly.  When the caller does not need random
      77             :  * access, we return from tuplesort_performsort() as soon as we are down
      78             :  * to one run per logical tape.  The final merge is then performed
      79             :  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
      80             :  * saves one cycle of writing all the data out to disk and reading it in.
      81             :  *
      82             :  * This module supports parallel sorting.  Parallel sorts involve coordination
      83             :  * among one or more worker processes, and a leader process, each with its own
      84             :  * tuplesort state.  The leader process (or, more accurately, the
      85             :  * Tuplesortstate associated with a leader process) creates a full tapeset
      86             :  * consisting of worker tapes with one run to merge; a run for every
      87             :  * worker process.  This is then merged.  Worker processes are guaranteed to
      88             :  * produce exactly one output run from their partial input.
      89             :  *
      90             :  *
      91             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      92             :  * Portions Copyright (c) 1994, Regents of the University of California
      93             :  *
      94             :  * IDENTIFICATION
      95             :  *    src/backend/utils/sort/tuplesort.c
      96             :  *
      97             :  *-------------------------------------------------------------------------
      98             :  */
      99             : 
     100             : #include "postgres.h"
     101             : 
     102             : #include <limits.h>
     103             : 
     104             : #include "commands/tablespace.h"
     105             : #include "miscadmin.h"
     106             : #include "pg_trace.h"
     107             : #include "storage/shmem.h"
     108             : #include "utils/guc.h"
     109             : #include "utils/memutils.h"
     110             : #include "utils/pg_rusage.h"
     111             : #include "utils/tuplesort.h"
     112             : 
     113             : /*
     114             :  * Initial size of memtuples array.  This must be more than
     115             :  * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples().  Clamp at
     116             :  * 1024 elements to avoid excessive reallocs.
     117             :  */
     118             : #define INITIAL_MEMTUPSIZE Max(1024, \
     119             :     ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
     120             : 
     121             : /* GUC variables */
     122             : bool        trace_sort = false;
     123             : 
     124             : #ifdef DEBUG_BOUNDED_SORT
     125             : bool        optimize_bounded_sort = true;
     126             : #endif
     127             : 
     128             : 
     129             : /*
     130             :  * During merge, we use a pre-allocated set of fixed-size slots to hold
     131             :  * tuples.  To avoid palloc/pfree overhead.
     132             :  *
     133             :  * Merge doesn't require a lot of memory, so we can afford to waste some,
     134             :  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
     135             :  * palloc() overhead is not significant anymore.
     136             :  *
     137             :  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
     138             :  * slot holds a tuple.
     139             :  */
     140             : #define SLAB_SLOT_SIZE 1024
     141             : 
     142             : typedef union SlabSlot
     143             : {
     144             :     union SlabSlot *nextfree;
     145             :     char        buffer[SLAB_SLOT_SIZE];
     146             : } SlabSlot;
     147             : 
     148             : /*
     149             :  * Possible states of a Tuplesort object.  These denote the states that
     150             :  * persist between calls of Tuplesort routines.
     151             :  */
     152             : typedef enum
     153             : {
     154             :     TSS_INITIAL,                /* Loading tuples; still within memory limit */
     155             :     TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
     156             :     TSS_BUILDRUNS,              /* Loading tuples; writing to tape */
     157             :     TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
     158             :     TSS_SORTEDONTAPE,           /* Sort completed, final run is on tape */
     159             :     TSS_FINALMERGE,             /* Performing final merge on-the-fly */
     160             : } TupSortStatus;
     161             : 
     162             : /*
     163             :  * Parameters for calculation of number of tapes to use --- see inittapes()
     164             :  * and tuplesort_merge_order().
     165             :  *
     166             :  * In this calculation we assume that each tape will cost us about 1 blocks
     167             :  * worth of buffer space.  This ignores the overhead of all the other data
     168             :  * structures needed for each tape, but it's probably close enough.
     169             :  *
     170             :  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
     171             :  * input tape, for pre-reading (see discussion at top of file).  This is *in
     172             :  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
     173             :  */
     174             : #define MINORDER        6       /* minimum merge order */
     175             : #define MAXORDER        500     /* maximum merge order */
     176             : #define TAPE_BUFFER_OVERHEAD        BLCKSZ
     177             : #define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
     178             : 
     179             : 
     180             : /*
     181             :  * Private state of a Tuplesort operation.
     182             :  */
     183             : struct Tuplesortstate
     184             : {
     185             :     TuplesortPublic base;
     186             :     TupSortStatus status;       /* enumerated value as shown above */
     187             :     bool        bounded;        /* did caller specify a maximum number of
     188             :                                  * tuples to return? */
     189             :     bool        boundUsed;      /* true if we made use of a bounded heap */
     190             :     int         bound;          /* if bounded, the maximum number of tuples */
     191             :     int64       tupleMem;       /* memory consumed by individual tuples.
     192             :                                  * storing this separately from what we track
     193             :                                  * in availMem allows us to subtract the
     194             :                                  * memory consumed by all tuples when dumping
     195             :                                  * tuples to tape */
     196             :     int64       availMem;       /* remaining memory available, in bytes */
     197             :     int64       allowedMem;     /* total memory allowed, in bytes */
     198             :     int         maxTapes;       /* max number of input tapes to merge in each
     199             :                                  * pass */
     200             :     int64       maxSpace;       /* maximum amount of space occupied among sort
     201             :                                  * of groups, either in-memory or on-disk */
     202             :     bool        isMaxSpaceDisk; /* true when maxSpace is value for on-disk
     203             :                                  * space, false when its value for in-memory
     204             :                                  * space */
     205             :     TupSortStatus maxSpaceStatus;   /* sort status when maxSpace was reached */
     206             :     LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
     207             : 
     208             :     /*
     209             :      * This array holds the tuples now in sort memory.  If we are in state
     210             :      * INITIAL, the tuples are in no particular order; if we are in state
     211             :      * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     212             :      * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     213             :      * H.  In state SORTEDONTAPE, the array is not used.
     214             :      */
     215             :     SortTuple  *memtuples;      /* array of SortTuple structs */
     216             :     int         memtupcount;    /* number of tuples currently present */
     217             :     int         memtupsize;     /* allocated length of memtuples array */
     218             :     bool        growmemtuples;  /* memtuples' growth still underway? */
     219             : 
     220             :     /*
     221             :      * Memory for tuples is sometimes allocated using a simple slab allocator,
     222             :      * rather than with palloc().  Currently, we switch to slab allocation
     223             :      * when we start merging.  Merging only needs to keep a small, fixed
     224             :      * number of tuples in memory at any time, so we can avoid the
     225             :      * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     226             :      * to hold the tuples.
     227             :      *
     228             :      * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     229             :      * slots.  The allocation is sized to have one slot per tape, plus one
     230             :      * additional slot.  We need that many slots to hold all the tuples kept
     231             :      * in the heap during merge, plus the one we have last returned from the
     232             :      * sort, with tuplesort_gettuple.
     233             :      *
     234             :      * Initially, all the slots are kept in a linked list of free slots.  When
     235             :      * a tuple is read from a tape, it is put to the next available slot, if
     236             :      * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     237             :      * instead.
     238             :      *
     239             :      * When we're done processing a tuple, we return the slot back to the free
     240             :      * list, or pfree() if it was palloc'd.  We know that a tuple was
     241             :      * allocated from the slab, if its pointer value is between
     242             :      * slabMemoryBegin and -End.
     243             :      *
     244             :      * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     245             :      * tracking memory usage is not used.
     246             :      */
     247             :     bool        slabAllocatorUsed;
     248             : 
     249             :     char       *slabMemoryBegin;    /* beginning of slab memory arena */
     250             :     char       *slabMemoryEnd;  /* end of slab memory arena */
     251             :     SlabSlot   *slabFreeHead;   /* head of free list */
     252             : 
     253             :     /* Memory used for input and output tape buffers. */
     254             :     size_t      tape_buffer_mem;
     255             : 
     256             :     /*
     257             :      * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     258             :      * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     259             :      * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     260             :      * recycle the memory on next gettuple call.
     261             :      */
     262             :     void       *lastReturnedTuple;
     263             : 
     264             :     /*
     265             :      * While building initial runs, this is the current output run number.
     266             :      * Afterwards, it is the number of initial runs we made.
     267             :      */
     268             :     int         currentRun;
     269             : 
     270             :     /*
     271             :      * Logical tapes, for merging.
     272             :      *
     273             :      * The initial runs are written in the output tapes.  In each merge pass,
     274             :      * the output tapes of the previous pass become the input tapes, and new
     275             :      * output tapes are created as needed.  When nInputTapes equals
     276             :      * nInputRuns, there is only one merge pass left.
     277             :      */
     278             :     LogicalTape **inputTapes;
     279             :     int         nInputTapes;
     280             :     int         nInputRuns;
     281             : 
     282             :     LogicalTape **outputTapes;
     283             :     int         nOutputTapes;
     284             :     int         nOutputRuns;
     285             : 
     286             :     LogicalTape *destTape;      /* current output tape */
     287             : 
     288             :     /*
     289             :      * These variables are used after completion of sorting to keep track of
     290             :      * the next tuple to return.  (In the tape case, the tape's current read
     291             :      * position is also critical state.)
     292             :      */
     293             :     LogicalTape *result_tape;   /* actual tape of finished output */
     294             :     int         current;        /* array index (only used if SORTEDINMEM) */
     295             :     bool        eof_reached;    /* reached EOF (needed for cursors) */
     296             : 
     297             :     /* markpos_xxx holds marked position for mark and restore */
     298             :     int64       markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
     299             :     int         markpos_offset; /* saved "current", or offset in tape block */
     300             :     bool        markpos_eof;    /* saved "eof_reached" */
     301             : 
     302             :     /*
     303             :      * These variables are used during parallel sorting.
     304             :      *
     305             :      * worker is our worker identifier.  Follows the general convention that
     306             :      * -1 value relates to a leader tuplesort, and values >= 0 worker
     307             :      * tuplesorts. (-1 can also be a serial tuplesort.)
     308             :      *
     309             :      * shared is mutable shared memory state, which is used to coordinate
     310             :      * parallel sorts.
     311             :      *
     312             :      * nParticipants is the number of worker Tuplesortstates known by the
     313             :      * leader to have actually been launched, which implies that they must
     314             :      * finish a run that the leader needs to merge.  Typically includes a
     315             :      * worker state held by the leader process itself.  Set in the leader
     316             :      * Tuplesortstate only.
     317             :      */
     318             :     int         worker;
     319             :     Sharedsort *shared;
     320             :     int         nParticipants;
     321             : 
     322             :     /*
     323             :      * Additional state for managing "abbreviated key" sortsupport routines
     324             :      * (which currently may be used by all cases except the hash index case).
     325             :      * Tracks the intervals at which the optimization's effectiveness is
     326             :      * tested.
     327             :      */
     328             :     int64       abbrevNext;     /* Tuple # at which to next check
     329             :                                  * applicability */
     330             : 
     331             :     /*
     332             :      * Resource snapshot for time of sort start.
     333             :      */
     334             :     PGRUsage    ru_start;
     335             : };
     336             : 
     337             : /*
     338             :  * Private mutable state of tuplesort-parallel-operation.  This is allocated
     339             :  * in shared memory.
     340             :  */
     341             : struct Sharedsort
     342             : {
     343             :     /* mutex protects all fields prior to tapes */
     344             :     slock_t     mutex;
     345             : 
     346             :     /*
     347             :      * currentWorker generates ordinal identifier numbers for parallel sort
     348             :      * workers.  These start from 0, and are always gapless.
     349             :      *
     350             :      * Workers increment workersFinished to indicate having finished.  If this
     351             :      * is equal to state.nParticipants within the leader, leader is ready to
     352             :      * merge worker runs.
     353             :      */
     354             :     int         currentWorker;
     355             :     int         workersFinished;
     356             : 
     357             :     /* Temporary file space */
     358             :     SharedFileSet fileset;
     359             : 
     360             :     /* Size of tapes flexible array */
     361             :     int         nTapes;
     362             : 
     363             :     /*
     364             :      * Tapes array used by workers to report back information needed by the
     365             :      * leader to concatenate all worker tapes into one for merging
     366             :      */
     367             :     TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
     368             : };
     369             : 
     370             : /*
     371             :  * Is the given tuple allocated from the slab memory arena?
     372             :  */
     373             : #define IS_SLAB_SLOT(state, tuple) \
     374             :     ((char *) (tuple) >= (state)->slabMemoryBegin && \
     375             :      (char *) (tuple) < (state)->slabMemoryEnd)
     376             : 
     377             : /*
     378             :  * Return the given tuple to the slab memory free list, or free it
     379             :  * if it was palloc'd.
     380             :  */
     381             : #define RELEASE_SLAB_SLOT(state, tuple) \
     382             :     do { \
     383             :         SlabSlot *buf = (SlabSlot *) tuple; \
     384             :         \
     385             :         if (IS_SLAB_SLOT((state), buf)) \
     386             :         { \
     387             :             buf->nextfree = (state)->slabFreeHead; \
     388             :             (state)->slabFreeHead = buf; \
     389             :         } else \
     390             :             pfree(buf); \
     391             :     } while(0)
     392             : 
     393             : #define REMOVEABBREV(state,stup,count)  ((*(state)->base.removeabbrev) (state, stup, count))
     394             : #define COMPARETUP(state,a,b)   ((*(state)->base.comparetup) (a, b, state))
     395             : #define WRITETUP(state,tape,stup)   ((*(state)->base.writetup) (state, tape, stup))
     396             : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
     397             : #define FREESTATE(state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
     398             : #define LACKMEM(state)      ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
     399             : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     400             : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     401             : #define SERIAL(state)       ((state)->shared == NULL)
     402             : #define WORKER(state)       ((state)->shared && (state)->worker != -1)
     403             : #define LEADER(state)       ((state)->shared && (state)->worker == -1)
     404             : 
     405             : /*
     406             :  * NOTES about on-tape representation of tuples:
     407             :  *
     408             :  * We require the first "unsigned int" of a stored tuple to be the total size
     409             :  * on-tape of the tuple, including itself (so it is never zero; an all-zero
     410             :  * unsigned int is used to delimit runs).  The remainder of the stored tuple
     411             :  * may or may not match the in-memory representation of the tuple ---
     412             :  * any conversion needed is the job of the writetup and readtup routines.
     413             :  *
     414             :  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
     415             :  * representation of the tuple must be followed by another "unsigned int" that
     416             :  * is a copy of the length --- so the total tape space used is actually
     417             :  * sizeof(unsigned int) more than the stored length value.  This allows
     418             :  * read-backwards.  When the random access flag was not specified, the
     419             :  * write/read routines may omit the extra length word.
     420             :  *
     421             :  * writetup is expected to write both length words as well as the tuple
     422             :  * data.  When readtup is called, the tape is positioned just after the
     423             :  * front length word; readtup must read the tuple data and advance past
     424             :  * the back length word (if present).
     425             :  *
     426             :  * The write/read routines can make use of the tuple description data
     427             :  * stored in the Tuplesortstate record, if needed.  They are also expected
     428             :  * to adjust state->availMem by the amount of memory space (not tape space!)
     429             :  * released or consumed.  There is no error return from either writetup
     430             :  * or readtup; they should ereport() on failure.
     431             :  *
     432             :  *
     433             :  * NOTES about memory consumption calculations:
     434             :  *
     435             :  * We count space allocated for tuples against the workMem limit, plus
     436             :  * the space used by the variable-size memtuples array.  Fixed-size space
     437             :  * is not counted; it's small enough to not be interesting.
     438             :  *
     439             :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     440             :  * rather than the originally-requested size.  This is important since
     441             :  * palloc can add substantial overhead.  It's not a complete answer since
     442             :  * we won't count any wasted space in palloc allocation blocks, but it's
     443             :  * a lot better than what we were doing before 7.3.  As of 9.6, a
     444             :  * separate memory context is used for caller passed tuples.  Resetting
     445             :  * it at certain key increments significantly ameliorates fragmentation.
     446             :  * readtup routines use the slab allocator (they cannot use
     447             :  * the reset context because it gets deleted at the point that merging
     448             :  * begins).
     449             :  */
     450             : 
     451             : 
     452             : static void tuplesort_begin_batch(Tuplesortstate *state);
     453             : static bool consider_abort_common(Tuplesortstate *state);
     454             : static void inittapes(Tuplesortstate *state, bool mergeruns);
     455             : static void inittapestate(Tuplesortstate *state, int maxTapes);
     456             : static void selectnewtape(Tuplesortstate *state);
     457             : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
     458             : static void mergeruns(Tuplesortstate *state);
     459             : static void mergeonerun(Tuplesortstate *state);
     460             : static void beginmerge(Tuplesortstate *state);
     461             : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
     462             : static void dumptuples(Tuplesortstate *state, bool alltuples);
     463             : static void make_bounded_heap(Tuplesortstate *state);
     464             : static void sort_bounded_heap(Tuplesortstate *state);
     465             : static void tuplesort_sort_memtuples(Tuplesortstate *state);
     466             : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
     467             : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
     468             : static void tuplesort_heap_delete_top(Tuplesortstate *state);
     469             : static void reversedirection(Tuplesortstate *state);
     470             : static unsigned int getlen(LogicalTape *tape, bool eofOK);
     471             : static void markrunend(LogicalTape *tape);
     472             : static int  worker_get_identifier(Tuplesortstate *state);
     473             : static void worker_freeze_result_tape(Tuplesortstate *state);
     474             : static void worker_nomergeruns(Tuplesortstate *state);
     475             : static void leader_takeover_tapes(Tuplesortstate *state);
     476             : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
     477             : static void tuplesort_free(Tuplesortstate *state);
     478             : static void tuplesort_updatemax(Tuplesortstate *state);
     479             : 
     480             : /*
     481             :  * Specialized comparators that we can inline into specialized sorts.  The goal
     482             :  * is to try to sort two tuples without having to follow the pointers to the
     483             :  * comparator or the tuple.
     484             :  *
     485             :  * XXX: For now, there is no specialization for cases where datum1 is
     486             :  * authoritative and we don't even need to fall back to a callback at all (that
     487             :  * would be true for types like int4/int8/timestamp/date, but not true for
     488             :  * abbreviations of text or multi-key sorts.  There could be!  Is it worth it?
     489             :  */
     490             : 
     491             : /* Used if first key's comparator is ssup_datum_unsigned_cmp */
     492             : static pg_attribute_always_inline int
     493    46531200 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     494             : {
     495             :     int         compare;
     496             : 
     497    46531200 :     compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
     498    46531200 :                                           b->datum1, b->isnull1,
     499             :                                           &state->base.sortKeys[0]);
     500    46531200 :     if (compare != 0)
     501    41966136 :         return compare;
     502             : 
     503             :     /*
     504             :      * No need to waste effort calling the tiebreak function when there are no
     505             :      * other keys to sort on.
     506             :      */
     507     4565064 :     if (state->base.onlyKey != NULL)
     508           0 :         return 0;
     509             : 
     510     4565064 :     return state->base.comparetup_tiebreak(a, b, state);
     511             : }
     512             : 
     513             : /* Used if first key's comparator is ssup_datum_signed_cmp */
     514             : static pg_attribute_always_inline int
     515     9309352 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     516             : {
     517             :     int         compare;
     518             : 
     519     9309352 :     compare = ApplySignedSortComparator(a->datum1, a->isnull1,
     520     9309352 :                                         b->datum1, b->isnull1,
     521             :                                         &state->base.sortKeys[0]);
     522             : 
     523     9309352 :     if (compare != 0)
     524     9236350 :         return compare;
     525             : 
     526             :     /*
     527             :      * No need to waste effort calling the tiebreak function when there are no
     528             :      * other keys to sort on.
     529             :      */
     530       73002 :     if (state->base.onlyKey != NULL)
     531       59078 :         return 0;
     532             : 
     533       13924 :     return state->base.comparetup_tiebreak(a, b, state);
     534             : }
     535             : 
     536             : /* Used if first key's comparator is ssup_datum_int32_cmp */
     537             : static pg_attribute_always_inline int
     538    55928016 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     539             : {
     540             :     int         compare;
     541             : 
     542    55928016 :     compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
     543    55928016 :                                        b->datum1, b->isnull1,
     544             :                                        &state->base.sortKeys[0]);
     545             : 
     546    55928016 :     if (compare != 0)
     547    39909578 :         return compare;
     548             : 
     549             :     /*
     550             :      * No need to waste effort calling the tiebreak function when there are no
     551             :      * other keys to sort on.
     552             :      */
     553    16018438 :     if (state->base.onlyKey != NULL)
     554     2275436 :         return 0;
     555             : 
     556    13743002 :     return state->base.comparetup_tiebreak(a, b, state);
     557             : }
     558             : 
     559             : /*
     560             :  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
     561             :  * any variant of SortTuples, using the appropriate comparetup function.
     562             :  * qsort_ssup() is specialized for the case where the comparetup function
     563             :  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
     564             :  * and Datum sorts.  qsort_tuple_{unsigned,signed,int32} are specialized for
     565             :  * common comparison functions on pass-by-value leading datums.
     566             :  */
     567             : 
     568             : #define ST_SORT qsort_tuple_unsigned
     569             : #define ST_ELEMENT_TYPE SortTuple
     570             : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
     571             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     572             : #define ST_CHECK_FOR_INTERRUPTS
     573             : #define ST_SCOPE static
     574             : #define ST_DEFINE
     575             : #include "lib/sort_template.h"
     576             : 
     577             : #define ST_SORT qsort_tuple_signed
     578             : #define ST_ELEMENT_TYPE SortTuple
     579             : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
     580             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     581             : #define ST_CHECK_FOR_INTERRUPTS
     582             : #define ST_SCOPE static
     583             : #define ST_DEFINE
     584             : #include "lib/sort_template.h"
     585             : 
     586             : #define ST_SORT qsort_tuple_int32
     587             : #define ST_ELEMENT_TYPE SortTuple
     588             : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
     589             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     590             : #define ST_CHECK_FOR_INTERRUPTS
     591             : #define ST_SCOPE static
     592             : #define ST_DEFINE
     593             : #include "lib/sort_template.h"
     594             : 
     595             : #define ST_SORT qsort_tuple
     596             : #define ST_ELEMENT_TYPE SortTuple
     597             : #define ST_COMPARE_RUNTIME_POINTER
     598             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     599             : #define ST_CHECK_FOR_INTERRUPTS
     600             : #define ST_SCOPE static
     601             : #define ST_DECLARE
     602             : #define ST_DEFINE
     603             : #include "lib/sort_template.h"
     604             : 
     605             : #define ST_SORT qsort_ssup
     606             : #define ST_ELEMENT_TYPE SortTuple
     607             : #define ST_COMPARE(a, b, ssup) \
     608             :     ApplySortComparator((a)->datum1, (a)->isnull1, \
     609             :                         (b)->datum1, (b)->isnull1, (ssup))
     610             : #define ST_COMPARE_ARG_TYPE SortSupportData
     611             : #define ST_CHECK_FOR_INTERRUPTS
     612             : #define ST_SCOPE static
     613             : #define ST_DEFINE
     614             : #include "lib/sort_template.h"
     615             : 
     616             : /*
     617             :  *      tuplesort_begin_xxx
     618             :  *
     619             :  * Initialize for a tuple sort operation.
     620             :  *
     621             :  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
     622             :  * zero or more times, then call tuplesort_performsort when all the tuples
     623             :  * have been supplied.  After performsort, retrieve the tuples in sorted
     624             :  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
     625             :  * access was requested, rescan, markpos, and restorepos can also be called.)
     626             :  * Call tuplesort_end to terminate the operation and release memory/disk space.
     627             :  *
     628             :  * Each variant of tuplesort_begin has a workMem parameter specifying the
     629             :  * maximum number of kilobytes of RAM to use before spilling data to disk.
     630             :  * (The normal value of this parameter is work_mem, but some callers use
     631             :  * other values.)  Each variant also has a sortopt which is a bitmask of
     632             :  * sort options.  See TUPLESORT_* definitions in tuplesort.h
     633             :  */
     634             : 
     635             : Tuplesortstate *
     636      275056 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
     637             : {
     638             :     Tuplesortstate *state;
     639             :     MemoryContext maincontext;
     640             :     MemoryContext sortcontext;
     641             :     MemoryContext oldcontext;
     642             : 
     643             :     /* See leader_takeover_tapes() remarks on random access support */
     644      275056 :     if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
     645           0 :         elog(ERROR, "random access disallowed under parallel sort");
     646             : 
     647             :     /*
     648             :      * Memory context surviving tuplesort_reset.  This memory context holds
     649             :      * data which is useful to keep while sorting multiple similar batches.
     650             :      */
     651      275056 :     maincontext = AllocSetContextCreate(CurrentMemoryContext,
     652             :                                         "TupleSort main",
     653             :                                         ALLOCSET_DEFAULT_SIZES);
     654             : 
     655             :     /*
     656             :      * Create a working memory context for one sort operation.  The content of
     657             :      * this context is deleted by tuplesort_reset.
     658             :      */
     659      275056 :     sortcontext = AllocSetContextCreate(maincontext,
     660             :                                         "TupleSort sort",
     661             :                                         ALLOCSET_DEFAULT_SIZES);
     662             : 
     663             :     /*
     664             :      * Additionally a working memory context for tuples is setup in
     665             :      * tuplesort_begin_batch.
     666             :      */
     667             : 
     668             :     /*
     669             :      * Make the Tuplesortstate within the per-sortstate context.  This way, we
     670             :      * don't need a separate pfree() operation for it at shutdown.
     671             :      */
     672      275056 :     oldcontext = MemoryContextSwitchTo(maincontext);
     673             : 
     674      275056 :     state = palloc0_object(Tuplesortstate);
     675             : 
     676      275056 :     if (trace_sort)
     677           0 :         pg_rusage_init(&state->ru_start);
     678             : 
     679      275056 :     state->base.sortopt = sortopt;
     680      275056 :     state->base.tuples = true;
     681      275056 :     state->abbrevNext = 10;
     682             : 
     683             :     /*
     684             :      * workMem is forced to be at least 64KB, the current minimum valid value
     685             :      * for the work_mem GUC.  This is a defense against parallel sort callers
     686             :      * that divide out memory among many workers in a way that leaves each
     687             :      * with very little memory.
     688             :      */
     689      275056 :     state->allowedMem = Max(workMem, 64) * (int64) 1024;
     690      275056 :     state->base.sortcontext = sortcontext;
     691      275056 :     state->base.maincontext = maincontext;
     692             : 
     693      275056 :     state->memtupsize = INITIAL_MEMTUPSIZE;
     694      275056 :     state->memtuples = NULL;
     695             : 
     696             :     /*
     697             :      * After all of the other non-parallel-related state, we setup all of the
     698             :      * state needed for each batch.
     699             :      */
     700      275056 :     tuplesort_begin_batch(state);
     701             : 
     702             :     /*
     703             :      * Initialize parallel-related state based on coordination information
     704             :      * from caller
     705             :      */
     706      275056 :     if (!coordinate)
     707             :     {
     708             :         /* Serial sort */
     709      274356 :         state->shared = NULL;
     710      274356 :         state->worker = -1;
     711      274356 :         state->nParticipants = -1;
     712             :     }
     713         700 :     else if (coordinate->isWorker)
     714             :     {
     715             :         /* Parallel worker produces exactly one final run from all input */
     716         468 :         state->shared = coordinate->sharedsort;
     717         468 :         state->worker = worker_get_identifier(state);
     718         468 :         state->nParticipants = -1;
     719             :     }
     720             :     else
     721             :     {
     722             :         /* Parallel leader state only used for final merge */
     723         232 :         state->shared = coordinate->sharedsort;
     724         232 :         state->worker = -1;
     725         232 :         state->nParticipants = coordinate->nParticipants;
     726             :         Assert(state->nParticipants >= 1);
     727             :     }
     728             : 
     729      275056 :     MemoryContextSwitchTo(oldcontext);
     730             : 
     731      275056 :     return state;
     732             : }
     733             : 
     734             : /*
     735             :  *      tuplesort_begin_batch
     736             :  *
     737             :  * Setup, or reset, all state need for processing a new set of tuples with this
     738             :  * sort state. Called both from tuplesort_begin_common (the first time sorting
     739             :  * with this sort state) and tuplesort_reset (for subsequent usages).
     740             :  */
     741             : static void
     742      278386 : tuplesort_begin_batch(Tuplesortstate *state)
     743             : {
     744             :     MemoryContext oldcontext;
     745             : 
     746      278386 :     oldcontext = MemoryContextSwitchTo(state->base.maincontext);
     747             : 
     748             :     /*
     749             :      * Caller tuple (e.g. IndexTuple) memory context.
     750             :      *
     751             :      * A dedicated child context used exclusively for caller passed tuples
     752             :      * eases memory management.  Resetting at key points reduces
     753             :      * fragmentation. Note that the memtuples array of SortTuples is allocated
     754             :      * in the parent context, not this context, because there is no need to
     755             :      * free memtuples early.  For bounded sorts, tuples may be pfreed in any
     756             :      * order, so we use a regular aset.c context so that it can make use of
     757             :      * free'd memory.  When the sort is not bounded, we make use of a bump.c
     758             :      * context as this keeps allocations more compact with less wastage.
     759             :      * Allocations are also slightly more CPU efficient.
     760             :      */
     761      278386 :     if (TupleSortUseBumpTupleCxt(state->base.sortopt))
     762      277020 :         state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
     763             :                                                      "Caller tuples",
     764             :                                                      ALLOCSET_DEFAULT_SIZES);
     765             :     else
     766        1366 :         state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
     767             :                                                          "Caller tuples",
     768             :                                                          ALLOCSET_DEFAULT_SIZES);
     769             : 
     770             : 
     771      278386 :     state->status = TSS_INITIAL;
     772      278386 :     state->bounded = false;
     773      278386 :     state->boundUsed = false;
     774             : 
     775      278386 :     state->availMem = state->allowedMem;
     776             : 
     777      278386 :     state->tapeset = NULL;
     778             : 
     779      278386 :     state->memtupcount = 0;
     780             : 
     781      278386 :     state->growmemtuples = true;
     782      278386 :     state->slabAllocatorUsed = false;
     783      278386 :     if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
     784             :     {
     785          72 :         pfree(state->memtuples);
     786          72 :         state->memtuples = NULL;
     787          72 :         state->memtupsize = INITIAL_MEMTUPSIZE;
     788             :     }
     789      278386 :     if (state->memtuples == NULL)
     790             :     {
     791      275128 :         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
     792      275128 :         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     793             :     }
     794             : 
     795             :     /* workMem must be large enough for the minimal memtuples array */
     796      278386 :     if (LACKMEM(state))
     797           0 :         elog(ERROR, "insufficient memory allowed for sort");
     798             : 
     799      278386 :     state->currentRun = 0;
     800             : 
     801             :     /*
     802             :      * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
     803             :      * inittapes(), if needed.
     804             :      */
     805             : 
     806      278386 :     state->result_tape = NULL;   /* flag that result tape has not been formed */
     807             : 
     808      278386 :     MemoryContextSwitchTo(oldcontext);
     809      278386 : }
     810             : 
     811             : /*
     812             :  * tuplesort_set_bound
     813             :  *
     814             :  *  Advise tuplesort that at most the first N result tuples are required.
     815             :  *
     816             :  * Must be called before inserting any tuples.  (Actually, we could allow it
     817             :  * as long as the sort hasn't spilled to disk, but there seems no need for
     818             :  * delayed calls at the moment.)
     819             :  *
     820             :  * This is a hint only. The tuplesort may still return more tuples than
     821             :  * requested.  Parallel leader tuplesorts will always ignore the hint.
     822             :  */
     823             : void
     824        1232 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
     825             : {
     826             :     /* Assert we're called before loading any tuples */
     827             :     Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
     828             :     /* Assert we allow bounded sorts */
     829             :     Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
     830             :     /* Can't set the bound twice, either */
     831             :     Assert(!state->bounded);
     832             :     /* Also, this shouldn't be called in a parallel worker */
     833             :     Assert(!WORKER(state));
     834             : 
     835             :     /* Parallel leader allows but ignores hint */
     836        1232 :     if (LEADER(state))
     837           0 :         return;
     838             : 
     839             : #ifdef DEBUG_BOUNDED_SORT
     840             :     /* Honor GUC setting that disables the feature (for easy testing) */
     841             :     if (!optimize_bounded_sort)
     842             :         return;
     843             : #endif
     844             : 
     845             :     /* We want to be able to compute bound * 2, so limit the setting */
     846        1232 :     if (bound > (int64) (INT_MAX / 2))
     847           0 :         return;
     848             : 
     849        1232 :     state->bounded = true;
     850        1232 :     state->bound = (int) bound;
     851             : 
     852             :     /*
     853             :      * Bounded sorts are not an effective target for abbreviated key
     854             :      * optimization.  Disable by setting state to be consistent with no
     855             :      * abbreviation support.
     856             :      */
     857        1232 :     state->base.sortKeys->abbrev_converter = NULL;
     858        1232 :     if (state->base.sortKeys->abbrev_full_comparator)
     859          16 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
     860             : 
     861             :     /* Not strictly necessary, but be tidy */
     862        1232 :     state->base.sortKeys->abbrev_abort = NULL;
     863        1232 :     state->base.sortKeys->abbrev_full_comparator = NULL;
     864             : }
     865             : 
     866             : /*
     867             :  * tuplesort_used_bound
     868             :  *
     869             :  * Allow callers to find out if the sort state was able to use a bound.
     870             :  */
     871             : bool
     872         380 : tuplesort_used_bound(Tuplesortstate *state)
     873             : {
     874         380 :     return state->boundUsed;
     875             : }
     876             : 
     877             : /*
     878             :  * tuplesort_free
     879             :  *
     880             :  *  Internal routine for freeing resources of tuplesort.
     881             :  */
     882             : static void
     883      278124 : tuplesort_free(Tuplesortstate *state)
     884             : {
     885             :     /* context swap probably not needed, but let's be safe */
     886      278124 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
     887             :     int64       spaceUsed;
     888             : 
     889      278124 :     if (state->tapeset)
     890         770 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset);
     891             :     else
     892      277354 :         spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
     893             : 
     894             :     /*
     895             :      * Delete temporary "tape" files, if any.
     896             :      *
     897             :      * We don't bother to destroy the individual tapes here. They will go away
     898             :      * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
     899             :      * finished tapes already.)
     900             :      */
     901      278124 :     if (state->tapeset)
     902         770 :         LogicalTapeSetClose(state->tapeset);
     903             : 
     904      278124 :     if (trace_sort)
     905             :     {
     906           0 :         if (state->tapeset)
     907           0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
     908             :                  SERIAL(state) ? "external sort" : "parallel external sort",
     909             :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     910             :         else
     911           0 :             elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
     912             :                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
     913             :                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
     914             :     }
     915             : 
     916             :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
     917             : 
     918      278124 :     FREESTATE(state);
     919      278124 :     MemoryContextSwitchTo(oldcontext);
     920             : 
     921             :     /*
     922             :      * Free the per-sort memory context, thereby releasing all working memory.
     923             :      */
     924      278124 :     MemoryContextReset(state->base.sortcontext);
     925      278124 : }
     926             : 
     927             : /*
     928             :  * tuplesort_end
     929             :  *
     930             :  *  Release resources and clean up.
     931             :  *
     932             :  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
     933             :  * pointing to garbage.  Be careful not to attempt to use or free such
     934             :  * pointers afterwards!
     935             :  */
     936             : void
     937      274794 : tuplesort_end(Tuplesortstate *state)
     938             : {
     939      274794 :     tuplesort_free(state);
     940             : 
     941             :     /*
     942             :      * Free the main memory context, including the Tuplesortstate struct
     943             :      * itself.
     944             :      */
     945      274794 :     MemoryContextDelete(state->base.maincontext);
     946      274794 : }
     947             : 
     948             : /*
     949             :  * tuplesort_updatemax
     950             :  *
     951             :  *  Update maximum resource usage statistics.
     952             :  */
     953             : static void
     954        3726 : tuplesort_updatemax(Tuplesortstate *state)
     955             : {
     956             :     int64       spaceUsed;
     957             :     bool        isSpaceDisk;
     958             : 
     959             :     /*
     960             :      * Note: it might seem we should provide both memory and disk usage for a
     961             :      * disk-based sort.  However, the current code doesn't track memory space
     962             :      * accurately once we have begun to return tuples to the caller (since we
     963             :      * don't account for pfree's the caller is expected to do), so we cannot
     964             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
     965             :      * to fix.  Is it worth creating an API for the memory context code to
     966             :      * tell us how much is actually used in sortcontext?
     967             :      */
     968        3726 :     if (state->tapeset)
     969             :     {
     970           6 :         isSpaceDisk = true;
     971           6 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
     972             :     }
     973             :     else
     974             :     {
     975        3720 :         isSpaceDisk = false;
     976        3720 :         spaceUsed = state->allowedMem - state->availMem;
     977             :     }
     978             : 
     979             :     /*
     980             :      * Sort evicts data to the disk when it wasn't able to fit that data into
     981             :      * main memory.  This is why we assume space used on the disk to be more
     982             :      * important for tracking resource usage than space used in memory. Note
     983             :      * that the amount of space occupied by some tupleset on the disk might be
     984             :      * less than amount of space occupied by the same tupleset in memory due
     985             :      * to more compact representation.
     986             :      */
     987        3726 :     if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
     988        3720 :         (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
     989             :     {
     990         506 :         state->maxSpace = spaceUsed;
     991         506 :         state->isMaxSpaceDisk = isSpaceDisk;
     992         506 :         state->maxSpaceStatus = state->status;
     993             :     }
     994        3726 : }
     995             : 
     996             : /*
     997             :  * tuplesort_reset
     998             :  *
     999             :  *  Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
    1000             :  *  meta-information in.  After tuplesort_reset, tuplesort is ready to start
    1001             :  *  a new sort.  This allows avoiding recreation of tuple sort states (and
    1002             :  *  save resources) when sorting multiple small batches.
    1003             :  */
    1004             : void
    1005        3330 : tuplesort_reset(Tuplesortstate *state)
    1006             : {
    1007        3330 :     tuplesort_updatemax(state);
    1008        3330 :     tuplesort_free(state);
    1009             : 
    1010             :     /*
    1011             :      * After we've freed up per-batch memory, re-setup all of the state common
    1012             :      * to both the first batch and any subsequent batch.
    1013             :      */
    1014        3330 :     tuplesort_begin_batch(state);
    1015             : 
    1016        3330 :     state->lastReturnedTuple = NULL;
    1017        3330 :     state->slabMemoryBegin = NULL;
    1018        3330 :     state->slabMemoryEnd = NULL;
    1019        3330 :     state->slabFreeHead = NULL;
    1020        3330 : }
    1021             : 
    1022             : /*
    1023             :  * Grow the memtuples[] array, if possible within our memory constraint.  We
    1024             :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
    1025             :  * limit.  Return true if we were able to enlarge the array, false if not.
    1026             :  *
    1027             :  * Normally, at each increment we double the size of the array.  When doing
    1028             :  * that would exceed a limit, we attempt one last, smaller increase (and then
    1029             :  * clear the growmemtuples flag so we don't try any more).  That allows us to
    1030             :  * use memory as fully as permitted; sticking to the pure doubling rule could
    1031             :  * result in almost half going unused.  Because availMem moves around with
    1032             :  * tuple addition/removal, we need some rule to prevent making repeated small
    1033             :  * increases in memtupsize, which would just be useless thrashing.  The
    1034             :  * growmemtuples flag accomplishes that and also prevents useless
    1035             :  * recalculations in this function.
    1036             :  */
    1037             : static bool
    1038        8470 : grow_memtuples(Tuplesortstate *state)
    1039             : {
    1040             :     int         newmemtupsize;
    1041        8470 :     int         memtupsize = state->memtupsize;
    1042        8470 :     int64       memNowUsed = state->allowedMem - state->availMem;
    1043             : 
    1044             :     /* Forget it if we've already maxed out memtuples, per comment above */
    1045        8470 :     if (!state->growmemtuples)
    1046         140 :         return false;
    1047             : 
    1048             :     /* Select new value of memtupsize */
    1049        8330 :     if (memNowUsed <= state->availMem)
    1050             :     {
    1051             :         /*
    1052             :          * We've used no more than half of allowedMem; double our usage,
    1053             :          * clamping at INT_MAX tuples.
    1054             :          */
    1055        8184 :         if (memtupsize < INT_MAX / 2)
    1056        8184 :             newmemtupsize = memtupsize * 2;
    1057             :         else
    1058             :         {
    1059           0 :             newmemtupsize = INT_MAX;
    1060           0 :             state->growmemtuples = false;
    1061             :         }
    1062             :     }
    1063             :     else
    1064             :     {
    1065             :         /*
    1066             :          * This will be the last increment of memtupsize.  Abandon doubling
    1067             :          * strategy and instead increase as much as we safely can.
    1068             :          *
    1069             :          * To stay within allowedMem, we can't increase memtupsize by more
    1070             :          * than availMem / sizeof(SortTuple) elements.  In practice, we want
    1071             :          * to increase it by considerably less, because we need to leave some
    1072             :          * space for the tuples to which the new array slots will refer.  We
    1073             :          * assume the new tuples will be about the same size as the tuples
    1074             :          * we've already seen, and thus we can extrapolate from the space
    1075             :          * consumption so far to estimate an appropriate new size for the
    1076             :          * memtuples array.  The optimal value might be higher or lower than
    1077             :          * this estimate, but it's hard to know that in advance.  We again
    1078             :          * clamp at INT_MAX tuples.
    1079             :          *
    1080             :          * This calculation is safe against enlarging the array so much that
    1081             :          * LACKMEM becomes true, because the memory currently used includes
    1082             :          * the present array; thus, there would be enough allowedMem for the
    1083             :          * new array elements even if no other memory were currently used.
    1084             :          *
    1085             :          * We do the arithmetic in float8, because otherwise the product of
    1086             :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
    1087             :          * result should be insignificant; but even if we computed a
    1088             :          * completely insane result, the checks below will prevent anything
    1089             :          * really bad from happening.
    1090             :          */
    1091             :         double      grow_ratio;
    1092             : 
    1093         146 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
    1094         146 :         if (memtupsize * grow_ratio < INT_MAX)
    1095         146 :             newmemtupsize = (int) (memtupsize * grow_ratio);
    1096             :         else
    1097           0 :             newmemtupsize = INT_MAX;
    1098             : 
    1099             :         /* We won't make any further enlargement attempts */
    1100         146 :         state->growmemtuples = false;
    1101             :     }
    1102             : 
    1103             :     /* Must enlarge array by at least one element, else report failure */
    1104        8330 :     if (newmemtupsize <= memtupsize)
    1105           0 :         goto noalloc;
    1106             : 
    1107             :     /*
    1108             :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
    1109             :      * to ensure our request won't be rejected.  Note that we can easily
    1110             :      * exhaust address space before facing this outcome.  (This is presently
    1111             :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
    1112             :      * don't rely on that at this distance.)
    1113             :      */
    1114        8330 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    1115             :     {
    1116           0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
    1117           0 :         state->growmemtuples = false;    /* can't grow any more */
    1118             :     }
    1119             : 
    1120             :     /*
    1121             :      * We need to be sure that we do not cause LACKMEM to become true, else
    1122             :      * the space management algorithm will go nuts.  The code above should
    1123             :      * never generate a dangerous request, but to be safe, check explicitly
    1124             :      * that the array growth fits within availMem.  (We could still cause
    1125             :      * LACKMEM if the memory chunk overhead associated with the memtuples
    1126             :      * array were to increase.  That shouldn't happen because we chose the
    1127             :      * initial array size large enough to ensure that palloc will be treating
    1128             :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
    1129             :      * explicitly below just in case.)
    1130             :      */
    1131        8330 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
    1132           0 :         goto noalloc;
    1133             : 
    1134             :     /* OK, do it */
    1135        8330 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1136        8330 :     state->memtupsize = newmemtupsize;
    1137        8330 :     state->memtuples = (SortTuple *)
    1138        8330 :         repalloc_huge(state->memtuples,
    1139        8330 :                       state->memtupsize * sizeof(SortTuple));
    1140        8330 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1141        8330 :     if (LACKMEM(state))
    1142           0 :         elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    1143        8330 :     return true;
    1144             : 
    1145           0 : noalloc:
    1146             :     /* If for any reason we didn't realloc, shut off future attempts */
    1147           0 :     state->growmemtuples = false;
    1148           0 :     return false;
    1149             : }
    1150             : 
    1151             : /*
    1152             :  * Shared code for tuple and datum cases.
    1153             :  */
    1154             : void
    1155    31111270 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
    1156             :                           bool useAbbrev, Size tuplen)
    1157             : {
    1158    31111270 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1159             : 
    1160             :     Assert(!LEADER(state));
    1161             : 
    1162             :     /* account for the memory used for this tuple */
    1163    31111270 :     USEMEM(state, tuplen);
    1164    31111270 :     state->tupleMem += tuplen;
    1165             : 
    1166    31111270 :     if (!useAbbrev)
    1167             :     {
    1168             :         /*
    1169             :          * Leave ordinary Datum representation, or NULL value.  If there is a
    1170             :          * converter it won't expect NULL values, and cost model is not
    1171             :          * required to account for NULL, so in that case we avoid calling
    1172             :          * converter and just set datum1 to zeroed representation (to be
    1173             :          * consistent, and to support cheap inequality tests for NULL
    1174             :          * abbreviated keys).
    1175             :          */
    1176             :     }
    1177     4441506 :     else if (!consider_abort_common(state))
    1178             :     {
    1179             :         /* Store abbreviated key representation */
    1180     4441410 :         tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
    1181             :                                                                state->base.sortKeys);
    1182             :     }
    1183             :     else
    1184             :     {
    1185             :         /*
    1186             :          * Set state to be consistent with never trying abbreviation.
    1187             :          *
    1188             :          * Alter datum1 representation in already-copied tuples, so as to
    1189             :          * ensure a consistent representation (current tuple was just
    1190             :          * handled).  It does not matter if some dumped tuples are already
    1191             :          * sorted on tape, since serialized tuples lack abbreviated keys
    1192             :          * (TSS_BUILDRUNS state prevents control reaching here in any case).
    1193             :          */
    1194          96 :         REMOVEABBREV(state, state->memtuples, state->memtupcount);
    1195             :     }
    1196             : 
    1197    31111270 :     switch (state->status)
    1198             :     {
    1199    26248778 :         case TSS_INITIAL:
    1200             : 
    1201             :             /*
    1202             :              * Save the tuple into the unsorted array.  First, grow the array
    1203             :              * as needed.  Note that we try to grow the array when there is
    1204             :              * still one free slot remaining --- if we fail, there'll still be
    1205             :              * room to store the incoming tuple, and then we'll switch to
    1206             :              * tape-based operation.
    1207             :              */
    1208    26248778 :             if (state->memtupcount >= state->memtupsize - 1)
    1209             :             {
    1210        8470 :                 (void) grow_memtuples(state);
    1211             :                 Assert(state->memtupcount < state->memtupsize);
    1212             :             }
    1213    26248778 :             state->memtuples[state->memtupcount++] = *tuple;
    1214             : 
    1215             :             /*
    1216             :              * Check if it's time to switch over to a bounded heapsort. We do
    1217             :              * so if the input tuple count exceeds twice the desired tuple
    1218             :              * count (this is a heuristic for where heapsort becomes cheaper
    1219             :              * than a quicksort), or if we've just filled workMem and have
    1220             :              * enough tuples to meet the bound.
    1221             :              *
    1222             :              * Note that once we enter TSS_BOUNDED state we will always try to
    1223             :              * complete the sort that way.  In the worst case, if later input
    1224             :              * tuples are larger than earlier ones, this might cause us to
    1225             :              * exceed workMem significantly.
    1226             :              */
    1227    26248778 :             if (state->bounded &&
    1228       47316 :                 (state->memtupcount > state->bound * 2 ||
    1229       46908 :                  (state->memtupcount > state->bound && LACKMEM(state))))
    1230             :             {
    1231         408 :                 if (trace_sort)
    1232           0 :                     elog(LOG, "switching to bounded heapsort at %d tuples: %s",
    1233             :                          state->memtupcount,
    1234             :                          pg_rusage_show(&state->ru_start));
    1235         408 :                 make_bounded_heap(state);
    1236         408 :                 MemoryContextSwitchTo(oldcontext);
    1237         408 :                 return;
    1238             :             }
    1239             : 
    1240             :             /*
    1241             :              * Done if we still fit in available memory and have array slots.
    1242             :              */
    1243    26248370 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
    1244             :             {
    1245    26248230 :                 MemoryContextSwitchTo(oldcontext);
    1246    26248230 :                 return;
    1247             :             }
    1248             : 
    1249             :             /*
    1250             :              * Nope; time to switch to tape-based operation.
    1251             :              */
    1252         140 :             inittapes(state, true);
    1253             : 
    1254             :             /*
    1255             :              * Dump all tuples.
    1256             :              */
    1257         140 :             dumptuples(state, false);
    1258         140 :             break;
    1259             : 
    1260     3750206 :         case TSS_BOUNDED:
    1261             : 
    1262             :             /*
    1263             :              * We don't want to grow the array here, so check whether the new
    1264             :              * tuple can be discarded before putting it in.  This should be a
    1265             :              * good speed optimization, too, since when there are many more
    1266             :              * input tuples than the bound, most input tuples can be discarded
    1267             :              * with just this one comparison.  Note that because we currently
    1268             :              * have the sort direction reversed, we must check for <= not >=.
    1269             :              */
    1270     3750206 :             if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
    1271             :             {
    1272             :                 /* new tuple <= top of the heap, so we can discard it */
    1273     3247240 :                 free_sort_tuple(state, tuple);
    1274     3247240 :                 CHECK_FOR_INTERRUPTS();
    1275             :             }
    1276             :             else
    1277             :             {
    1278             :                 /* discard top of heap, replacing it with the new tuple */
    1279      502966 :                 free_sort_tuple(state, &state->memtuples[0]);
    1280      502966 :                 tuplesort_heap_replace_top(state, tuple);
    1281             :             }
    1282     3750206 :             break;
    1283             : 
    1284     1112286 :         case TSS_BUILDRUNS:
    1285             : 
    1286             :             /*
    1287             :              * Save the tuple into the unsorted array (there must be space)
    1288             :              */
    1289     1112286 :             state->memtuples[state->memtupcount++] = *tuple;
    1290             : 
    1291             :             /*
    1292             :              * If we are over the memory limit, dump all tuples.
    1293             :              */
    1294     1112286 :             dumptuples(state, false);
    1295     1112286 :             break;
    1296             : 
    1297           0 :         default:
    1298           0 :             elog(ERROR, "invalid tuplesort state");
    1299             :             break;
    1300             :     }
    1301     4862632 :     MemoryContextSwitchTo(oldcontext);
    1302             : }
    1303             : 
    1304             : static bool
    1305     4441506 : consider_abort_common(Tuplesortstate *state)
    1306             : {
    1307             :     Assert(state->base.sortKeys[0].abbrev_converter != NULL);
    1308             :     Assert(state->base.sortKeys[0].abbrev_abort != NULL);
    1309             :     Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
    1310             : 
    1311             :     /*
    1312             :      * Check effectiveness of abbreviation optimization.  Consider aborting
    1313             :      * when still within memory limit.
    1314             :      */
    1315     4441506 :     if (state->status == TSS_INITIAL &&
    1316     3969076 :         state->memtupcount >= state->abbrevNext)
    1317             :     {
    1318        5062 :         state->abbrevNext *= 2;
    1319             : 
    1320             :         /*
    1321             :          * Check opclass-supplied abbreviation abort routine.  It may indicate
    1322             :          * that abbreviation should not proceed.
    1323             :          */
    1324        5062 :         if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
    1325             :                                                 state->base.sortKeys))
    1326        4966 :             return false;
    1327             : 
    1328             :         /*
    1329             :          * Finally, restore authoritative comparator, and indicate that
    1330             :          * abbreviation is not in play by setting abbrev_converter to NULL
    1331             :          */
    1332          96 :         state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
    1333          96 :         state->base.sortKeys[0].abbrev_converter = NULL;
    1334             :         /* Not strictly necessary, but be tidy */
    1335          96 :         state->base.sortKeys[0].abbrev_abort = NULL;
    1336          96 :         state->base.sortKeys[0].abbrev_full_comparator = NULL;
    1337             : 
    1338             :         /* Give up - expect original pass-by-value representation */
    1339          96 :         return true;
    1340             :     }
    1341             : 
    1342     4436444 :     return false;
    1343             : }
    1344             : 
    1345             : /*
    1346             :  * All tuples have been provided; finish the sort.
    1347             :  */
    1348             : void
    1349      236044 : tuplesort_performsort(Tuplesortstate *state)
    1350             : {
    1351      236044 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1352             : 
    1353      236044 :     if (trace_sort)
    1354           0 :         elog(LOG, "performsort of worker %d starting: %s",
    1355             :              state->worker, pg_rusage_show(&state->ru_start));
    1356             : 
    1357      236044 :     switch (state->status)
    1358             :     {
    1359      235496 :         case TSS_INITIAL:
    1360             : 
    1361             :             /*
    1362             :              * We were able to accumulate all the tuples within the allowed
    1363             :              * amount of memory, or leader to take over worker tapes
    1364             :              */
    1365      235496 :             if (SERIAL(state))
    1366             :             {
    1367             :                 /* Just qsort 'em and we're done */
    1368      234866 :                 tuplesort_sort_memtuples(state);
    1369      234776 :                 state->status = TSS_SORTEDINMEM;
    1370             :             }
    1371         630 :             else if (WORKER(state))
    1372             :             {
    1373             :                 /*
    1374             :                  * Parallel workers must still dump out tuples to tape.  No
    1375             :                  * merge is required to produce single output run, though.
    1376             :                  */
    1377         468 :                 inittapes(state, false);
    1378         468 :                 dumptuples(state, true);
    1379         468 :                 worker_nomergeruns(state);
    1380         468 :                 state->status = TSS_SORTEDONTAPE;
    1381             :             }
    1382             :             else
    1383             :             {
    1384             :                 /*
    1385             :                  * Leader will take over worker tapes and merge worker runs.
    1386             :                  * Note that mergeruns sets the correct state->status.
    1387             :                  */
    1388         162 :                 leader_takeover_tapes(state);
    1389         162 :                 mergeruns(state);
    1390             :             }
    1391      235406 :             state->current = 0;
    1392      235406 :             state->eof_reached = false;
    1393      235406 :             state->markpos_block = 0L;
    1394      235406 :             state->markpos_offset = 0;
    1395      235406 :             state->markpos_eof = false;
    1396      235406 :             break;
    1397             : 
    1398         408 :         case TSS_BOUNDED:
    1399             : 
    1400             :             /*
    1401             :              * We were able to accumulate all the tuples required for output
    1402             :              * in memory, using a heap to eliminate excess tuples.  Now we
    1403             :              * have to transform the heap to a properly-sorted array. Note
    1404             :              * that sort_bounded_heap sets the correct state->status.
    1405             :              */
    1406         408 :             sort_bounded_heap(state);
    1407         408 :             state->current = 0;
    1408         408 :             state->eof_reached = false;
    1409         408 :             state->markpos_offset = 0;
    1410         408 :             state->markpos_eof = false;
    1411         408 :             break;
    1412             : 
    1413         140 :         case TSS_BUILDRUNS:
    1414             : 
    1415             :             /*
    1416             :              * Finish tape-based sort.  First, flush all tuples remaining in
    1417             :              * memory out to tape; then merge until we have a single remaining
    1418             :              * run (or, if !randomAccess and !WORKER(), one run per tape).
    1419             :              * Note that mergeruns sets the correct state->status.
    1420             :              */
    1421         140 :             dumptuples(state, true);
    1422         140 :             mergeruns(state);
    1423         140 :             state->eof_reached = false;
    1424         140 :             state->markpos_block = 0L;
    1425         140 :             state->markpos_offset = 0;
    1426         140 :             state->markpos_eof = false;
    1427         140 :             break;
    1428             : 
    1429           0 :         default:
    1430           0 :             elog(ERROR, "invalid tuplesort state");
    1431             :             break;
    1432             :     }
    1433             : 
    1434      235954 :     if (trace_sort)
    1435             :     {
    1436           0 :         if (state->status == TSS_FINALMERGE)
    1437           0 :             elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
    1438             :                  state->worker, state->nInputTapes,
    1439             :                  pg_rusage_show(&state->ru_start));
    1440             :         else
    1441           0 :             elog(LOG, "performsort of worker %d done: %s",
    1442             :                  state->worker, pg_rusage_show(&state->ru_start));
    1443             :     }
    1444             : 
    1445      235954 :     MemoryContextSwitchTo(oldcontext);
    1446      235954 : }
    1447             : 
    1448             : /*
    1449             :  * Internal routine to fetch the next tuple in either forward or back
    1450             :  * direction into *stup.  Returns false if no more tuples.
    1451             :  * Returned tuple belongs to tuplesort memory context, and must not be freed
    1452             :  * by caller.  Note that fetched tuple is stored in memory that may be
    1453             :  * recycled by any future fetch.
    1454             :  */
    1455             : bool
    1456    28605084 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    1457             :                           SortTuple *stup)
    1458             : {
    1459             :     unsigned int tuplen;
    1460             :     size_t      nmoved;
    1461             : 
    1462             :     Assert(!WORKER(state));
    1463             : 
    1464    28605084 :     switch (state->status)
    1465             :     {
    1466    23657470 :         case TSS_SORTEDINMEM:
    1467             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1468             :             Assert(!state->slabAllocatorUsed);
    1469    23657470 :             if (forward)
    1470             :             {
    1471    23657404 :                 if (state->current < state->memtupcount)
    1472             :                 {
    1473    23423576 :                     *stup = state->memtuples[state->current++];
    1474    23423576 :                     return true;
    1475             :                 }
    1476      233828 :                 state->eof_reached = true;
    1477             : 
    1478             :                 /*
    1479             :                  * Complain if caller tries to retrieve more tuples than
    1480             :                  * originally asked for in a bounded sort.  This is because
    1481             :                  * returning EOF here might be the wrong thing.
    1482             :                  */
    1483      233828 :                 if (state->bounded && state->current >= state->bound)
    1484           0 :                     elog(ERROR, "retrieved too many tuples in a bounded sort");
    1485             : 
    1486      233828 :                 return false;
    1487             :             }
    1488             :             else
    1489             :             {
    1490          66 :                 if (state->current <= 0)
    1491           0 :                     return false;
    1492             : 
    1493             :                 /*
    1494             :                  * if all tuples are fetched already then we return last
    1495             :                  * tuple, else - tuple before last returned.
    1496             :                  */
    1497          66 :                 if (state->eof_reached)
    1498          12 :                     state->eof_reached = false;
    1499             :                 else
    1500             :                 {
    1501          54 :                     state->current--;    /* last returned tuple */
    1502          54 :                     if (state->current <= 0)
    1503           6 :                         return false;
    1504             :                 }
    1505          60 :                 *stup = state->memtuples[state->current - 1];
    1506          60 :                 return true;
    1507             :             }
    1508             :             break;
    1509             : 
    1510      303000 :         case TSS_SORTEDONTAPE:
    1511             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1512             :             Assert(state->slabAllocatorUsed);
    1513             : 
    1514             :             /*
    1515             :              * The slot that held the tuple that we returned in previous
    1516             :              * gettuple call can now be reused.
    1517             :              */
    1518      303000 :             if (state->lastReturnedTuple)
    1519             :             {
    1520      152850 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1521      152850 :                 state->lastReturnedTuple = NULL;
    1522             :             }
    1523             : 
    1524      303000 :             if (forward)
    1525             :             {
    1526      302970 :                 if (state->eof_reached)
    1527           0 :                     return false;
    1528             : 
    1529      302970 :                 if ((tuplen = getlen(state->result_tape, true)) != 0)
    1530             :                 {
    1531      302940 :                     READTUP(state, stup, state->result_tape, tuplen);
    1532             : 
    1533             :                     /*
    1534             :                      * Remember the tuple we return, so that we can recycle
    1535             :                      * its memory on next call.  (This can be NULL, in the
    1536             :                      * !state->tuples case).
    1537             :                      */
    1538      302940 :                     state->lastReturnedTuple = stup->tuple;
    1539             : 
    1540      302940 :                     return true;
    1541             :                 }
    1542             :                 else
    1543             :                 {
    1544          30 :                     state->eof_reached = true;
    1545          30 :                     return false;
    1546             :                 }
    1547             :             }
    1548             : 
    1549             :             /*
    1550             :              * Backward.
    1551             :              *
    1552             :              * if all tuples are fetched already then we return last tuple,
    1553             :              * else - tuple before last returned.
    1554             :              */
    1555          30 :             if (state->eof_reached)
    1556             :             {
    1557             :                 /*
    1558             :                  * Seek position is pointing just past the zero tuplen at the
    1559             :                  * end of file; back up to fetch last tuple's ending length
    1560             :                  * word.  If seek fails we must have a completely empty file.
    1561             :                  */
    1562          12 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1563             :                                               2 * sizeof(unsigned int));
    1564          12 :                 if (nmoved == 0)
    1565           0 :                     return false;
    1566          12 :                 else if (nmoved != 2 * sizeof(unsigned int))
    1567           0 :                     elog(ERROR, "unexpected tape position");
    1568          12 :                 state->eof_reached = false;
    1569             :             }
    1570             :             else
    1571             :             {
    1572             :                 /*
    1573             :                  * Back up and fetch previously-returned tuple's ending length
    1574             :                  * word.  If seek fails, assume we are at start of file.
    1575             :                  */
    1576          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1577             :                                               sizeof(unsigned int));
    1578          18 :                 if (nmoved == 0)
    1579           0 :                     return false;
    1580          18 :                 else if (nmoved != sizeof(unsigned int))
    1581           0 :                     elog(ERROR, "unexpected tape position");
    1582          18 :                 tuplen = getlen(state->result_tape, false);
    1583             : 
    1584             :                 /*
    1585             :                  * Back up to get ending length word of tuple before it.
    1586             :                  */
    1587          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1588             :                                               tuplen + 2 * sizeof(unsigned int));
    1589          18 :                 if (nmoved == tuplen + sizeof(unsigned int))
    1590             :                 {
    1591             :                     /*
    1592             :                      * We backed up over the previous tuple, but there was no
    1593             :                      * ending length word before it.  That means that the prev
    1594             :                      * tuple is the first tuple in the file.  It is now the
    1595             :                      * next to read in forward direction (not obviously right,
    1596             :                      * but that is what in-memory case does).
    1597             :                      */
    1598           6 :                     return false;
    1599             :                 }
    1600          12 :                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
    1601           0 :                     elog(ERROR, "bogus tuple length in backward scan");
    1602             :             }
    1603             : 
    1604          24 :             tuplen = getlen(state->result_tape, false);
    1605             : 
    1606             :             /*
    1607             :              * Now we have the length of the prior tuple, back up and read it.
    1608             :              * Note: READTUP expects we are positioned after the initial
    1609             :              * length word of the tuple, so back up to that point.
    1610             :              */
    1611          24 :             nmoved = LogicalTapeBackspace(state->result_tape,
    1612             :                                           tuplen);
    1613          24 :             if (nmoved != tuplen)
    1614           0 :                 elog(ERROR, "bogus tuple length in backward scan");
    1615          24 :             READTUP(state, stup, state->result_tape, tuplen);
    1616             : 
    1617             :             /*
    1618             :              * Remember the tuple we return, so that we can recycle its memory
    1619             :              * on next call. (This can be NULL, in the Datum case).
    1620             :              */
    1621          24 :             state->lastReturnedTuple = stup->tuple;
    1622             : 
    1623          24 :             return true;
    1624             : 
    1625     4644614 :         case TSS_FINALMERGE:
    1626             :             Assert(forward);
    1627             :             /* We are managing memory ourselves, with the slab allocator. */
    1628             :             Assert(state->slabAllocatorUsed);
    1629             : 
    1630             :             /*
    1631             :              * The slab slot holding the tuple that we returned in previous
    1632             :              * gettuple call can now be reused.
    1633             :              */
    1634     4644614 :             if (state->lastReturnedTuple)
    1635             :             {
    1636     4524304 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1637     4524304 :                 state->lastReturnedTuple = NULL;
    1638             :             }
    1639             : 
    1640             :             /*
    1641             :              * This code should match the inner loop of mergeonerun().
    1642             :              */
    1643     4644614 :             if (state->memtupcount > 0)
    1644             :             {
    1645     4644352 :                 int         srcTapeIndex = state->memtuples[0].srctape;
    1646     4644352 :                 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
    1647             :                 SortTuple   newtup;
    1648             : 
    1649     4644352 :                 *stup = state->memtuples[0];
    1650             : 
    1651             :                 /*
    1652             :                  * Remember the tuple we return, so that we can recycle its
    1653             :                  * memory on next call. (This can be NULL, in the Datum case).
    1654             :                  */
    1655     4644352 :                 state->lastReturnedTuple = stup->tuple;
    1656             : 
    1657             :                 /*
    1658             :                  * Pull next tuple from tape, and replace the returned tuple
    1659             :                  * at top of the heap with it.
    1660             :                  */
    1661     4644352 :                 if (!mergereadnext(state, srcTape, &newtup))
    1662             :                 {
    1663             :                     /*
    1664             :                      * If no more data, we've reached end of run on this tape.
    1665             :                      * Remove the top node from the heap.
    1666             :                      */
    1667         388 :                     tuplesort_heap_delete_top(state);
    1668         388 :                     state->nInputRuns--;
    1669             : 
    1670             :                     /*
    1671             :                      * Close the tape.  It'd go away at the end of the sort
    1672             :                      * anyway, but better to release the memory early.
    1673             :                      */
    1674         388 :                     LogicalTapeClose(srcTape);
    1675         388 :                     return true;
    1676             :                 }
    1677     4643964 :                 newtup.srctape = srcTapeIndex;
    1678     4643964 :                 tuplesort_heap_replace_top(state, &newtup);
    1679     4643964 :                 return true;
    1680             :             }
    1681         262 :             return false;
    1682             : 
    1683           0 :         default:
    1684           0 :             elog(ERROR, "invalid tuplesort state");
    1685             :             return false;       /* keep compiler quiet */
    1686             :     }
    1687             : }
    1688             : 
    1689             : 
    1690             : /*
    1691             :  * Advance over N tuples in either forward or back direction,
    1692             :  * without returning any data.  N==0 is a no-op.
    1693             :  * Returns true if successful, false if ran out of tuples.
    1694             :  */
    1695             : bool
    1696         392 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
    1697             : {
    1698             :     MemoryContext oldcontext;
    1699             : 
    1700             :     /*
    1701             :      * We don't actually support backwards skip yet, because no callers need
    1702             :      * it.  The API is designed to allow for that later, though.
    1703             :      */
    1704             :     Assert(forward);
    1705             :     Assert(ntuples >= 0);
    1706             :     Assert(!WORKER(state));
    1707             : 
    1708         392 :     switch (state->status)
    1709             :     {
    1710         368 :         case TSS_SORTEDINMEM:
    1711         368 :             if (state->memtupcount - state->current >= ntuples)
    1712             :             {
    1713         368 :                 state->current += ntuples;
    1714         368 :                 return true;
    1715             :             }
    1716           0 :             state->current = state->memtupcount;
    1717           0 :             state->eof_reached = true;
    1718             : 
    1719             :             /*
    1720             :              * Complain if caller tries to retrieve more tuples than
    1721             :              * originally asked for in a bounded sort.  This is because
    1722             :              * returning EOF here might be the wrong thing.
    1723             :              */
    1724           0 :             if (state->bounded && state->current >= state->bound)
    1725           0 :                 elog(ERROR, "retrieved too many tuples in a bounded sort");
    1726             : 
    1727           0 :             return false;
    1728             : 
    1729          24 :         case TSS_SORTEDONTAPE:
    1730             :         case TSS_FINALMERGE:
    1731             : 
    1732             :             /*
    1733             :              * We could probably optimize these cases better, but for now it's
    1734             :              * not worth the trouble.
    1735             :              */
    1736          24 :             oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1737      240132 :             while (ntuples-- > 0)
    1738             :             {
    1739             :                 SortTuple   stup;
    1740             : 
    1741      240108 :                 if (!tuplesort_gettuple_common(state, forward, &stup))
    1742             :                 {
    1743           0 :                     MemoryContextSwitchTo(oldcontext);
    1744           0 :                     return false;
    1745             :                 }
    1746      240108 :                 CHECK_FOR_INTERRUPTS();
    1747             :             }
    1748          24 :             MemoryContextSwitchTo(oldcontext);
    1749          24 :             return true;
    1750             : 
    1751           0 :         default:
    1752           0 :             elog(ERROR, "invalid tuplesort state");
    1753             :             return false;       /* keep compiler quiet */
    1754             :     }
    1755             : }
    1756             : 
    1757             : /*
    1758             :  * tuplesort_merge_order - report merge order we'll use for given memory
    1759             :  * (note: "merge order" just means the number of input tapes in the merge).
    1760             :  *
    1761             :  * This is exported for use by the planner.  allowedMem is in bytes.
    1762             :  */
    1763             : int
    1764       18648 : tuplesort_merge_order(int64 allowedMem)
    1765             : {
    1766             :     int         mOrder;
    1767             : 
    1768             :     /*----------
    1769             :      * In the merge phase, we need buffer space for each input and output tape.
    1770             :      * Each pass in the balanced merge algorithm reads from M input tapes, and
    1771             :      * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
    1772             :      * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
    1773             :      * input tape.
    1774             :      *
    1775             :      * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
    1776             :      *            N * TAPE_BUFFER_OVERHEAD
    1777             :      *
    1778             :      * Except for the last and next-to-last merge passes, where there can be
    1779             :      * fewer tapes left to process, M = N.  We choose M so that we have the
    1780             :      * desired amount of memory available for the input buffers
    1781             :      * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
    1782             :      * available for the tape buffers (allowedMem).
    1783             :      *
    1784             :      * Note: you might be thinking we need to account for the memtuples[]
    1785             :      * array in this calculation, but we effectively treat that as part of the
    1786             :      * MERGE_BUFFER_SIZE workspace.
    1787             :      *----------
    1788             :      */
    1789       18648 :     mOrder = allowedMem /
    1790             :         (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
    1791             : 
    1792             :     /*
    1793             :      * Even in minimum memory, use at least a MINORDER merge.  On the other
    1794             :      * hand, even when we have lots of memory, do not use more than a MAXORDER
    1795             :      * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
    1796             :      * additional tape reduces the amount of memory available to build runs,
    1797             :      * which in turn can cause the same sort to need more runs, which makes
    1798             :      * merging slower even if it can still be done in a single pass.  Also,
    1799             :      * high order merges are quite slow due to CPU cache effects; it can be
    1800             :      * faster to pay the I/O cost of a multi-pass merge than to perform a
    1801             :      * single merge pass across many hundreds of tapes.
    1802             :      */
    1803       18648 :     mOrder = Max(mOrder, MINORDER);
    1804       18648 :     mOrder = Min(mOrder, MAXORDER);
    1805             : 
    1806       18648 :     return mOrder;
    1807             : }
    1808             : 
    1809             : /*
    1810             :  * Helper function to calculate how much memory to allocate for the read buffer
    1811             :  * of each input tape in a merge pass.
    1812             :  *
    1813             :  * 'avail_mem' is the amount of memory available for the buffers of all the
    1814             :  *      tapes, both input and output.
    1815             :  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
    1816             :  * 'maxOutputTapes' is the max. number of output tapes we should produce.
    1817             :  */
    1818             : static int64
    1819         332 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
    1820             :                        int maxOutputTapes)
    1821             : {
    1822             :     int         nOutputRuns;
    1823             :     int         nOutputTapes;
    1824             : 
    1825             :     /*
    1826             :      * How many output tapes will we produce in this pass?
    1827             :      *
    1828             :      * This is nInputRuns / nInputTapes, rounded up.
    1829             :      */
    1830         332 :     nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
    1831             : 
    1832         332 :     nOutputTapes = Min(nOutputRuns, maxOutputTapes);
    1833             : 
    1834             :     /*
    1835             :      * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
    1836             :      * remaining memory is divided evenly between the input tapes.
    1837             :      *
    1838             :      * This also follows from the formula in tuplesort_merge_order, but here
    1839             :      * we derive the input buffer size from the amount of memory available,
    1840             :      * and M and N.
    1841             :      */
    1842         332 :     return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
    1843             : }
    1844             : 
    1845             : /*
    1846             :  * inittapes - initialize for tape sorting.
    1847             :  *
    1848             :  * This is called only if we have found we won't sort in memory.
    1849             :  */
    1850             : static void
    1851         608 : inittapes(Tuplesortstate *state, bool mergeruns)
    1852             : {
    1853             :     Assert(!LEADER(state));
    1854             : 
    1855         608 :     if (mergeruns)
    1856             :     {
    1857             :         /* Compute number of input tapes to use when merging */
    1858         140 :         state->maxTapes = tuplesort_merge_order(state->allowedMem);
    1859             :     }
    1860             :     else
    1861             :     {
    1862             :         /* Workers can sometimes produce single run, output without merge */
    1863             :         Assert(WORKER(state));
    1864         468 :         state->maxTapes = MINORDER;
    1865             :     }
    1866             : 
    1867         608 :     if (trace_sort)
    1868           0 :         elog(LOG, "worker %d switching to external sort with %d tapes: %s",
    1869             :              state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
    1870             : 
    1871             :     /* Create the tape set */
    1872         608 :     inittapestate(state, state->maxTapes);
    1873         608 :     state->tapeset =
    1874         608 :         LogicalTapeSetCreate(false,
    1875         608 :                              state->shared ? &state->shared->fileset : NULL,
    1876             :                              state->worker);
    1877             : 
    1878         608 :     state->currentRun = 0;
    1879             : 
    1880             :     /*
    1881             :      * Initialize logical tape arrays.
    1882             :      */
    1883         608 :     state->inputTapes = NULL;
    1884         608 :     state->nInputTapes = 0;
    1885         608 :     state->nInputRuns = 0;
    1886             : 
    1887         608 :     state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
    1888         608 :     state->nOutputTapes = 0;
    1889         608 :     state->nOutputRuns = 0;
    1890             : 
    1891         608 :     state->status = TSS_BUILDRUNS;
    1892             : 
    1893         608 :     selectnewtape(state);
    1894         608 : }
    1895             : 
    1896             : /*
    1897             :  * inittapestate - initialize generic tape management state
    1898             :  */
    1899             : static void
    1900         770 : inittapestate(Tuplesortstate *state, int maxTapes)
    1901             : {
    1902             :     int64       tapeSpace;
    1903             : 
    1904             :     /*
    1905             :      * Decrease availMem to reflect the space needed for tape buffers; but
    1906             :      * don't decrease it to the point that we have no room for tuples. (That
    1907             :      * case is only likely to occur if sorting pass-by-value Datums; in all
    1908             :      * other scenarios the memtuples[] array is unlikely to occupy more than
    1909             :      * half of allowedMem.  In the pass-by-value case it's not important to
    1910             :      * account for tuple space, so we don't care if LACKMEM becomes
    1911             :      * inaccurate.)
    1912             :      */
    1913         770 :     tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
    1914             : 
    1915         770 :     if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
    1916         648 :         USEMEM(state, tapeSpace);
    1917             : 
    1918             :     /*
    1919             :      * Make sure that the temp file(s) underlying the tape set are created in
    1920             :      * suitable temp tablespaces.  For parallel sorts, this should have been
    1921             :      * called already, but it doesn't matter if it is called a second time.
    1922             :      */
    1923         770 :     PrepareTempTablespaces();
    1924         770 : }
    1925             : 
    1926             : /*
    1927             :  * selectnewtape -- select next tape to output to.
    1928             :  *
    1929             :  * This is called after finishing a run when we know another run
    1930             :  * must be started.  This is used both when building the initial
    1931             :  * runs, and during merge passes.
    1932             :  */
    1933             : static void
    1934        1712 : selectnewtape(Tuplesortstate *state)
    1935             : {
    1936             :     /*
    1937             :      * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
    1938             :      * both zero.  On each call, we create a new output tape to hold the next
    1939             :      * run, until maxTapes is reached.  After that, we assign new runs to the
    1940             :      * existing tapes in a round robin fashion.
    1941             :      */
    1942        1712 :     if (state->nOutputTapes < state->maxTapes)
    1943             :     {
    1944             :         /* Create a new tape to hold the next run */
    1945             :         Assert(state->outputTapes[state->nOutputRuns] == NULL);
    1946             :         Assert(state->nOutputRuns == state->nOutputTapes);
    1947        1130 :         state->destTape = LogicalTapeCreate(state->tapeset);
    1948        1130 :         state->outputTapes[state->nOutputTapes] = state->destTape;
    1949        1130 :         state->nOutputTapes++;
    1950        1130 :         state->nOutputRuns++;
    1951             :     }
    1952             :     else
    1953             :     {
    1954             :         /*
    1955             :          * We have reached the max number of tapes.  Append to an existing
    1956             :          * tape.
    1957             :          */
    1958         582 :         state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
    1959         582 :         state->nOutputRuns++;
    1960             :     }
    1961        1712 : }
    1962             : 
    1963             : /*
    1964             :  * Initialize the slab allocation arena, for the given number of slots.
    1965             :  */
    1966             : static void
    1967         302 : init_slab_allocator(Tuplesortstate *state, int numSlots)
    1968             : {
    1969         302 :     if (numSlots > 0)
    1970             :     {
    1971             :         char       *p;
    1972             :         int         i;
    1973             : 
    1974         274 :         state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
    1975         274 :         state->slabMemoryEnd = state->slabMemoryBegin +
    1976         274 :             numSlots * SLAB_SLOT_SIZE;
    1977         274 :         state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
    1978         274 :         USEMEM(state, numSlots * SLAB_SLOT_SIZE);
    1979             : 
    1980         274 :         p = state->slabMemoryBegin;
    1981        1042 :         for (i = 0; i < numSlots - 1; i++)
    1982             :         {
    1983         768 :             ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
    1984         768 :             p += SLAB_SLOT_SIZE;
    1985             :         }
    1986         274 :         ((SlabSlot *) p)->nextfree = NULL;
    1987             :     }
    1988             :     else
    1989             :     {
    1990          28 :         state->slabMemoryBegin = state->slabMemoryEnd = NULL;
    1991          28 :         state->slabFreeHead = NULL;
    1992             :     }
    1993         302 :     state->slabAllocatorUsed = true;
    1994         302 : }
    1995             : 
    1996             : /*
    1997             :  * mergeruns -- merge all the completed initial runs.
    1998             :  *
    1999             :  * This implements the Balanced k-Way Merge Algorithm.  All input data has
    2000             :  * already been written to initial runs on tape (see dumptuples).
    2001             :  */
    2002             : static void
    2003         302 : mergeruns(Tuplesortstate *state)
    2004             : {
    2005             :     int         tapenum;
    2006             : 
    2007             :     Assert(state->status == TSS_BUILDRUNS);
    2008             :     Assert(state->memtupcount == 0);
    2009             : 
    2010         302 :     if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    2011             :     {
    2012             :         /*
    2013             :          * If there are multiple runs to be merged, when we go to read back
    2014             :          * tuples from disk, abbreviated keys will not have been stored, and
    2015             :          * we don't care to regenerate them.  Disable abbreviation from this
    2016             :          * point on.
    2017             :          */
    2018          30 :         state->base.sortKeys->abbrev_converter = NULL;
    2019          30 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
    2020             : 
    2021             :         /* Not strictly necessary, but be tidy */
    2022          30 :         state->base.sortKeys->abbrev_abort = NULL;
    2023          30 :         state->base.sortKeys->abbrev_full_comparator = NULL;
    2024             :     }
    2025             : 
    2026             :     /*
    2027             :      * Reset tuple memory.  We've freed all the tuples that we previously
    2028             :      * allocated.  We will use the slab allocator from now on.
    2029             :      */
    2030         302 :     MemoryContextResetOnly(state->base.tuplecontext);
    2031             : 
    2032             :     /*
    2033             :      * We no longer need a large memtuples array.  (We will allocate a smaller
    2034             :      * one for the heap later.)
    2035             :      */
    2036         302 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2037         302 :     pfree(state->memtuples);
    2038         302 :     state->memtuples = NULL;
    2039             : 
    2040             :     /*
    2041             :      * Initialize the slab allocator.  We need one slab slot per input tape,
    2042             :      * for the tuples in the heap, plus one to hold the tuple last returned
    2043             :      * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
    2044             :      * however, we don't need to do allocate anything.)
    2045             :      *
    2046             :      * In a multi-pass merge, we could shrink this allocation for the last
    2047             :      * merge pass, if it has fewer tapes than previous passes, but we don't
    2048             :      * bother.
    2049             :      *
    2050             :      * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
    2051             :      * to track memory usage of individual tuples.
    2052             :      */
    2053         302 :     if (state->base.tuples)
    2054         274 :         init_slab_allocator(state, state->nOutputTapes + 1);
    2055             :     else
    2056          28 :         init_slab_allocator(state, 0);
    2057             : 
    2058             :     /*
    2059             :      * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
    2060             :      * from each input tape.
    2061             :      *
    2062             :      * We could shrink this, too, between passes in a multi-pass merge, but we
    2063             :      * don't bother.  (The initial input tapes are still in outputTapes.  The
    2064             :      * number of input tapes will not increase between passes.)
    2065             :      */
    2066         302 :     state->memtupsize = state->nOutputTapes;
    2067         604 :     state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
    2068         302 :                                                         state->nOutputTapes * sizeof(SortTuple));
    2069         302 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2070             : 
    2071             :     /*
    2072             :      * Use all the remaining memory we have available for tape buffers among
    2073             :      * all the input tapes.  At the beginning of each merge pass, we will
    2074             :      * divide this memory between the input and output tapes in the pass.
    2075             :      */
    2076         302 :     state->tape_buffer_mem = state->availMem;
    2077         302 :     USEMEM(state, state->tape_buffer_mem);
    2078         302 :     if (trace_sort)
    2079           0 :         elog(LOG, "worker %d using %zu KB of memory for tape buffers",
    2080             :              state->worker, state->tape_buffer_mem / 1024);
    2081             : 
    2082             :     for (;;)
    2083             :     {
    2084             :         /*
    2085             :          * On the first iteration, or if we have read all the runs from the
    2086             :          * input tapes in a multi-pass merge, it's time to start a new pass.
    2087             :          * Rewind all the output tapes, and make them inputs for the next
    2088             :          * pass.
    2089             :          */
    2090         440 :         if (state->nInputRuns == 0)
    2091             :         {
    2092             :             int64       input_buffer_size;
    2093             : 
    2094             :             /* Close the old, emptied, input tapes */
    2095         332 :             if (state->nInputTapes > 0)
    2096             :             {
    2097         210 :                 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2098         180 :                     LogicalTapeClose(state->inputTapes[tapenum]);
    2099          30 :                 pfree(state->inputTapes);
    2100             :             }
    2101             : 
    2102             :             /* Previous pass's outputs become next pass's inputs. */
    2103         332 :             state->inputTapes = state->outputTapes;
    2104         332 :             state->nInputTapes = state->nOutputTapes;
    2105         332 :             state->nInputRuns = state->nOutputRuns;
    2106             : 
    2107             :             /*
    2108             :              * Reset output tape variables.  The actual LogicalTapes will be
    2109             :              * created as needed, here we only allocate the array to hold
    2110             :              * them.
    2111             :              */
    2112         332 :             state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
    2113         332 :             state->nOutputTapes = 0;
    2114         332 :             state->nOutputRuns = 0;
    2115             : 
    2116             :             /*
    2117             :              * Redistribute the memory allocated for tape buffers, among the
    2118             :              * new input and output tapes.
    2119             :              */
    2120         332 :             input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
    2121             :                                                        state->nInputTapes,
    2122             :                                                        state->nInputRuns,
    2123             :                                                        state->maxTapes);
    2124             : 
    2125         332 :             if (trace_sort)
    2126           0 :                 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
    2127             :                      state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
    2128             :                      pg_rusage_show(&state->ru_start));
    2129             : 
    2130             :             /* Prepare the new input tapes for merge pass. */
    2131        1300 :             for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2132         968 :                 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
    2133             : 
    2134             :             /*
    2135             :              * If there's just one run left on each input tape, then only one
    2136             :              * merge pass remains.  If we don't have to produce a materialized
    2137             :              * sorted tape, we can stop at this point and do the final merge
    2138             :              * on-the-fly.
    2139             :              */
    2140         332 :             if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
    2141         310 :                 && state->nInputRuns <= state->nInputTapes
    2142         280 :                 && !WORKER(state))
    2143             :             {
    2144             :                 /* Tell logtape.c we won't be writing anymore */
    2145         280 :                 LogicalTapeSetForgetFreeSpace(state->tapeset);
    2146             :                 /* Initialize for the final merge pass */
    2147         280 :                 beginmerge(state);
    2148         280 :                 state->status = TSS_FINALMERGE;
    2149         280 :                 return;
    2150             :             }
    2151             :         }
    2152             : 
    2153             :         /* Select an output tape */
    2154         160 :         selectnewtape(state);
    2155             : 
    2156             :         /* Merge one run from each input tape. */
    2157         160 :         mergeonerun(state);
    2158             : 
    2159             :         /*
    2160             :          * If the input tapes are empty, and we output only one output run,
    2161             :          * we're done.  The current output tape contains the final result.
    2162             :          */
    2163         160 :         if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
    2164          22 :             break;
    2165             :     }
    2166             : 
    2167             :     /*
    2168             :      * Done.  The result is on a single run on a single tape.
    2169             :      */
    2170          22 :     state->result_tape = state->outputTapes[0];
    2171          22 :     if (!WORKER(state))
    2172          22 :         LogicalTapeFreeze(state->result_tape, NULL);
    2173             :     else
    2174           0 :         worker_freeze_result_tape(state);
    2175          22 :     state->status = TSS_SORTEDONTAPE;
    2176             : 
    2177             :     /* Close all the now-empty input tapes, to release their read buffers. */
    2178         114 :     for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2179          92 :         LogicalTapeClose(state->inputTapes[tapenum]);
    2180             : }
    2181             : 
    2182             : /*
    2183             :  * Merge one run from each input tape.
    2184             :  */
    2185             : static void
    2186         160 : mergeonerun(Tuplesortstate *state)
    2187             : {
    2188             :     int         srcTapeIndex;
    2189             :     LogicalTape *srcTape;
    2190             : 
    2191             :     /*
    2192             :      * Start the merge by loading one tuple from each active source tape into
    2193             :      * the heap.
    2194             :      */
    2195         160 :     beginmerge(state);
    2196             : 
    2197             :     Assert(state->slabAllocatorUsed);
    2198             : 
    2199             :     /*
    2200             :      * Execute merge by repeatedly extracting lowest tuple in heap, writing it
    2201             :      * out, and replacing it with next tuple from same tape (if there is
    2202             :      * another one).
    2203             :      */
    2204      875592 :     while (state->memtupcount > 0)
    2205             :     {
    2206             :         SortTuple   stup;
    2207             : 
    2208             :         /* write the tuple to destTape */
    2209      875432 :         srcTapeIndex = state->memtuples[0].srctape;
    2210      875432 :         srcTape = state->inputTapes[srcTapeIndex];
    2211      875432 :         WRITETUP(state, state->destTape, &state->memtuples[0]);
    2212             : 
    2213             :         /* recycle the slot of the tuple we just wrote out, for the next read */
    2214      875432 :         if (state->memtuples[0].tuple)
    2215      735348 :             RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
    2216             : 
    2217             :         /*
    2218             :          * pull next tuple from the tape, and replace the written-out tuple in
    2219             :          * the heap with it.
    2220             :          */
    2221      875432 :         if (mergereadnext(state, srcTape, &stup))
    2222             :         {
    2223      874578 :             stup.srctape = srcTapeIndex;
    2224      874578 :             tuplesort_heap_replace_top(state, &stup);
    2225             :         }
    2226             :         else
    2227             :         {
    2228         854 :             tuplesort_heap_delete_top(state);
    2229         854 :             state->nInputRuns--;
    2230             :         }
    2231             :     }
    2232             : 
    2233             :     /*
    2234             :      * When the heap empties, we're done.  Write an end-of-run marker on the
    2235             :      * output tape.
    2236             :      */
    2237         160 :     markrunend(state->destTape);
    2238         160 : }
    2239             : 
    2240             : /*
    2241             :  * beginmerge - initialize for a merge pass
    2242             :  *
    2243             :  * Fill the merge heap with the first tuple from each input tape.
    2244             :  */
    2245             : static void
    2246         440 : beginmerge(Tuplesortstate *state)
    2247             : {
    2248             :     int         activeTapes;
    2249             :     int         srcTapeIndex;
    2250             : 
    2251             :     /* Heap should be empty here */
    2252             :     Assert(state->memtupcount == 0);
    2253             : 
    2254         440 :     activeTapes = Min(state->nInputTapes, state->nInputRuns);
    2255             : 
    2256        1990 :     for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    2257             :     {
    2258             :         SortTuple   tup;
    2259             : 
    2260        1550 :         if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
    2261             :         {
    2262        1290 :             tup.srctape = srcTapeIndex;
    2263        1290 :             tuplesort_heap_insert(state, &tup);
    2264             :         }
    2265             :     }
    2266         440 : }
    2267             : 
    2268             : /*
    2269             :  * mergereadnext - read next tuple from one merge input tape
    2270             :  *
    2271             :  * Returns false on EOF.
    2272             :  */
    2273             : static bool
    2274     5521334 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
    2275             : {
    2276             :     unsigned int tuplen;
    2277             : 
    2278             :     /* read next tuple, if any */
    2279     5521334 :     if ((tuplen = getlen(srcTape, true)) == 0)
    2280        1502 :         return false;
    2281     5519832 :     READTUP(state, stup, srcTape, tuplen);
    2282             : 
    2283     5519832 :     return true;
    2284             : }
    2285             : 
    2286             : /*
    2287             :  * dumptuples - remove tuples from memtuples and write initial run to tape
    2288             :  *
    2289             :  * When alltuples = true, dump everything currently in memory.  (This case is
    2290             :  * only used at end of input data.)
    2291             :  */
    2292             : static void
    2293     1113034 : dumptuples(Tuplesortstate *state, bool alltuples)
    2294             : {
    2295             :     int         memtupwrite;
    2296             :     int         i;
    2297             : 
    2298             :     /*
    2299             :      * Nothing to do if we still fit in available memory and have array slots,
    2300             :      * unless this is the final call during initial run generation.
    2301             :      */
    2302     1113034 :     if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
    2303     1112090 :         !alltuples)
    2304     1111482 :         return;
    2305             : 
    2306             :     /*
    2307             :      * Final call might require no sorting, in rare cases where we just so
    2308             :      * happen to have previously LACKMEM()'d at the point where exactly all
    2309             :      * remaining tuples are loaded into memory, just before input was
    2310             :      * exhausted.  In general, short final runs are quite possible, but avoid
    2311             :      * creating a completely empty run.  In a worker, though, we must produce
    2312             :      * at least one tape, even if it's empty.
    2313             :      */
    2314        1552 :     if (state->memtupcount == 0 && state->currentRun > 0)
    2315           0 :         return;
    2316             : 
    2317             :     Assert(state->status == TSS_BUILDRUNS);
    2318             : 
    2319             :     /*
    2320             :      * It seems unlikely that this limit will ever be exceeded, but take no
    2321             :      * chances
    2322             :      */
    2323        1552 :     if (state->currentRun == INT_MAX)
    2324           0 :         ereport(ERROR,
    2325             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2326             :                  errmsg("cannot have more than %d runs for an external sort",
    2327             :                         INT_MAX)));
    2328             : 
    2329        1552 :     if (state->currentRun > 0)
    2330         944 :         selectnewtape(state);
    2331             : 
    2332        1552 :     state->currentRun++;
    2333             : 
    2334        1552 :     if (trace_sort)
    2335           0 :         elog(LOG, "worker %d starting quicksort of run %d: %s",
    2336             :              state->worker, state->currentRun,
    2337             :              pg_rusage_show(&state->ru_start));
    2338             : 
    2339             :     /*
    2340             :      * Sort all tuples accumulated within the allowed amount of memory for
    2341             :      * this run using quicksort
    2342             :      */
    2343        1552 :     tuplesort_sort_memtuples(state);
    2344             : 
    2345        1552 :     if (trace_sort)
    2346           0 :         elog(LOG, "worker %d finished quicksort of run %d: %s",
    2347             :              state->worker, state->currentRun,
    2348             :              pg_rusage_show(&state->ru_start));
    2349             : 
    2350        1552 :     memtupwrite = state->memtupcount;
    2351     5161156 :     for (i = 0; i < memtupwrite; i++)
    2352             :     {
    2353     5159604 :         SortTuple  *stup = &state->memtuples[i];
    2354             : 
    2355     5159604 :         WRITETUP(state, state->destTape, stup);
    2356             :     }
    2357             : 
    2358        1552 :     state->memtupcount = 0;
    2359             : 
    2360             :     /*
    2361             :      * Reset tuple memory.  We've freed all of the tuples that we previously
    2362             :      * allocated.  It's important to avoid fragmentation when there is a stark
    2363             :      * change in the sizes of incoming tuples.  In bounded sorts,
    2364             :      * fragmentation due to AllocSetFree's bucketing by size class might be
    2365             :      * particularly bad if this step wasn't taken.
    2366             :      */
    2367        1552 :     MemoryContextReset(state->base.tuplecontext);
    2368             : 
    2369             :     /*
    2370             :      * Now update the memory accounting to subtract the memory used by the
    2371             :      * tuple.
    2372             :      */
    2373        1552 :     FREEMEM(state, state->tupleMem);
    2374        1552 :     state->tupleMem = 0;
    2375             : 
    2376        1552 :     markrunend(state->destTape);
    2377             : 
    2378        1552 :     if (trace_sort)
    2379           0 :         elog(LOG, "worker %d finished writing run %d to tape %d: %s",
    2380             :              state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
    2381             :              pg_rusage_show(&state->ru_start));
    2382             : }
    2383             : 
    2384             : /*
    2385             :  * tuplesort_rescan     - rewind and replay the scan
    2386             :  */
    2387             : void
    2388          48 : tuplesort_rescan(Tuplesortstate *state)
    2389             : {
    2390          48 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2391             : 
    2392             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2393             : 
    2394          48 :     switch (state->status)
    2395             :     {
    2396          40 :         case TSS_SORTEDINMEM:
    2397          40 :             state->current = 0;
    2398          40 :             state->eof_reached = false;
    2399          40 :             state->markpos_offset = 0;
    2400          40 :             state->markpos_eof = false;
    2401          40 :             break;
    2402           8 :         case TSS_SORTEDONTAPE:
    2403           8 :             LogicalTapeRewindForRead(state->result_tape, 0);
    2404           8 :             state->eof_reached = false;
    2405           8 :             state->markpos_block = 0L;
    2406           8 :             state->markpos_offset = 0;
    2407           8 :             state->markpos_eof = false;
    2408           8 :             break;
    2409           0 :         default:
    2410           0 :             elog(ERROR, "invalid tuplesort state");
    2411             :             break;
    2412             :     }
    2413             : 
    2414          48 :     MemoryContextSwitchTo(oldcontext);
    2415          48 : }
    2416             : 
    2417             : /*
    2418             :  * tuplesort_markpos    - saves current position in the merged sort file
    2419             :  */
    2420             : void
    2421      583640 : tuplesort_markpos(Tuplesortstate *state)
    2422             : {
    2423      583640 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2424             : 
    2425             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2426             : 
    2427      583640 :     switch (state->status)
    2428             :     {
    2429      574832 :         case TSS_SORTEDINMEM:
    2430      574832 :             state->markpos_offset = state->current;
    2431      574832 :             state->markpos_eof = state->eof_reached;
    2432      574832 :             break;
    2433        8808 :         case TSS_SORTEDONTAPE:
    2434        8808 :             LogicalTapeTell(state->result_tape,
    2435             :                             &state->markpos_block,
    2436             :                             &state->markpos_offset);
    2437        8808 :             state->markpos_eof = state->eof_reached;
    2438        8808 :             break;
    2439           0 :         default:
    2440           0 :             elog(ERROR, "invalid tuplesort state");
    2441             :             break;
    2442             :     }
    2443             : 
    2444      583640 :     MemoryContextSwitchTo(oldcontext);
    2445      583640 : }
    2446             : 
    2447             : /*
    2448             :  * tuplesort_restorepos - restores current position in merged sort file to
    2449             :  *                        last saved position
    2450             :  */
    2451             : void
    2452       37660 : tuplesort_restorepos(Tuplesortstate *state)
    2453             : {
    2454       37660 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2455             : 
    2456             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2457             : 
    2458       37660 :     switch (state->status)
    2459             :     {
    2460       31468 :         case TSS_SORTEDINMEM:
    2461       31468 :             state->current = state->markpos_offset;
    2462       31468 :             state->eof_reached = state->markpos_eof;
    2463       31468 :             break;
    2464        6192 :         case TSS_SORTEDONTAPE:
    2465        6192 :             LogicalTapeSeek(state->result_tape,
    2466             :                             state->markpos_block,
    2467             :                             state->markpos_offset);
    2468        6192 :             state->eof_reached = state->markpos_eof;
    2469        6192 :             break;
    2470           0 :         default:
    2471           0 :             elog(ERROR, "invalid tuplesort state");
    2472             :             break;
    2473             :     }
    2474             : 
    2475       37660 :     MemoryContextSwitchTo(oldcontext);
    2476       37660 : }
    2477             : 
    2478             : /*
    2479             :  * tuplesort_get_stats - extract summary statistics
    2480             :  *
    2481             :  * This can be called after tuplesort_performsort() finishes to obtain
    2482             :  * printable summary information about how the sort was performed.
    2483             :  */
    2484             : void
    2485         396 : tuplesort_get_stats(Tuplesortstate *state,
    2486             :                     TuplesortInstrumentation *stats)
    2487             : {
    2488             :     /*
    2489             :      * Note: it might seem we should provide both memory and disk usage for a
    2490             :      * disk-based sort.  However, the current code doesn't track memory space
    2491             :      * accurately once we have begun to return tuples to the caller (since we
    2492             :      * don't account for pfree's the caller is expected to do), so we cannot
    2493             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    2494             :      * to fix.  Is it worth creating an API for the memory context code to
    2495             :      * tell us how much is actually used in sortcontext?
    2496             :      */
    2497         396 :     tuplesort_updatemax(state);
    2498             : 
    2499         396 :     if (state->isMaxSpaceDisk)
    2500           6 :         stats->spaceType = SORT_SPACE_TYPE_DISK;
    2501             :     else
    2502         390 :         stats->spaceType = SORT_SPACE_TYPE_MEMORY;
    2503         396 :     stats->spaceUsed = (state->maxSpace + 1023) / 1024;
    2504             : 
    2505         396 :     switch (state->maxSpaceStatus)
    2506             :     {
    2507         390 :         case TSS_SORTEDINMEM:
    2508         390 :             if (state->boundUsed)
    2509          42 :                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
    2510             :             else
    2511         348 :                 stats->sortMethod = SORT_TYPE_QUICKSORT;
    2512         390 :             break;
    2513           0 :         case TSS_SORTEDONTAPE:
    2514           0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
    2515           0 :             break;
    2516           6 :         case TSS_FINALMERGE:
    2517           6 :             stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
    2518           6 :             break;
    2519           0 :         default:
    2520           0 :             stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
    2521           0 :             break;
    2522             :     }
    2523         396 : }
    2524             : 
    2525             : /*
    2526             :  * Convert TuplesortMethod to a string.
    2527             :  */
    2528             : const char *
    2529         294 : tuplesort_method_name(TuplesortMethod m)
    2530             : {
    2531         294 :     switch (m)
    2532             :     {
    2533           0 :         case SORT_TYPE_STILL_IN_PROGRESS:
    2534           0 :             return "still in progress";
    2535          42 :         case SORT_TYPE_TOP_N_HEAPSORT:
    2536          42 :             return "top-N heapsort";
    2537         246 :         case SORT_TYPE_QUICKSORT:
    2538         246 :             return "quicksort";
    2539           0 :         case SORT_TYPE_EXTERNAL_SORT:
    2540           0 :             return "external sort";
    2541           6 :         case SORT_TYPE_EXTERNAL_MERGE:
    2542           6 :             return "external merge";
    2543             :     }
    2544             : 
    2545           0 :     return "unknown";
    2546             : }
    2547             : 
    2548             : /*
    2549             :  * Convert TuplesortSpaceType to a string.
    2550             :  */
    2551             : const char *
    2552         258 : tuplesort_space_type_name(TuplesortSpaceType t)
    2553             : {
    2554             :     Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
    2555         258 :     return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
    2556             : }
    2557             : 
    2558             : 
    2559             : /*
    2560             :  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
    2561             :  */
    2562             : 
    2563             : /*
    2564             :  * Convert the existing unordered array of SortTuples to a bounded heap,
    2565             :  * discarding all but the smallest "state->bound" tuples.
    2566             :  *
    2567             :  * When working with a bounded heap, we want to keep the largest entry
    2568             :  * at the root (array entry zero), instead of the smallest as in the normal
    2569             :  * sort case.  This allows us to discard the largest entry cheaply.
    2570             :  * Therefore, we temporarily reverse the sort direction.
    2571             :  */
    2572             : static void
    2573         408 : make_bounded_heap(Tuplesortstate *state)
    2574             : {
    2575         408 :     int         tupcount = state->memtupcount;
    2576             :     int         i;
    2577             : 
    2578             :     Assert(state->status == TSS_INITIAL);
    2579             :     Assert(state->bounded);
    2580             :     Assert(tupcount >= state->bound);
    2581             :     Assert(SERIAL(state));
    2582             : 
    2583             :     /* Reverse sort direction so largest entry will be at root */
    2584         408 :     reversedirection(state);
    2585             : 
    2586         408 :     state->memtupcount = 0;      /* make the heap empty */
    2587       37980 :     for (i = 0; i < tupcount; i++)
    2588             :     {
    2589       37572 :         if (state->memtupcount < state->bound)
    2590             :         {
    2591             :             /* Insert next tuple into heap */
    2592             :             /* Must copy source tuple to avoid possible overwrite */
    2593       18582 :             SortTuple   stup = state->memtuples[i];
    2594             : 
    2595       18582 :             tuplesort_heap_insert(state, &stup);
    2596             :         }
    2597             :         else
    2598             :         {
    2599             :             /*
    2600             :              * The heap is full.  Replace the largest entry with the new
    2601             :              * tuple, or just discard it, if it's larger than anything already
    2602             :              * in the heap.
    2603             :              */
    2604       18990 :             if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
    2605             :             {
    2606        9794 :                 free_sort_tuple(state, &state->memtuples[i]);
    2607        9794 :                 CHECK_FOR_INTERRUPTS();
    2608             :             }
    2609             :             else
    2610        9196 :                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
    2611             :         }
    2612             :     }
    2613             : 
    2614             :     Assert(state->memtupcount == state->bound);
    2615         408 :     state->status = TSS_BOUNDED;
    2616         408 : }
    2617             : 
    2618             : /*
    2619             :  * Convert the bounded heap to a properly-sorted array
    2620             :  */
    2621             : static void
    2622         408 : sort_bounded_heap(Tuplesortstate *state)
    2623             : {
    2624         408 :     int         tupcount = state->memtupcount;
    2625             : 
    2626             :     Assert(state->status == TSS_BOUNDED);
    2627             :     Assert(state->bounded);
    2628             :     Assert(tupcount == state->bound);
    2629             :     Assert(SERIAL(state));
    2630             : 
    2631             :     /*
    2632             :      * We can unheapify in place because each delete-top call will remove the
    2633             :      * largest entry, which we can promptly store in the newly freed slot at
    2634             :      * the end.  Once we're down to a single-entry heap, we're done.
    2635             :      */
    2636       18582 :     while (state->memtupcount > 1)
    2637             :     {
    2638       18174 :         SortTuple   stup = state->memtuples[0];
    2639             : 
    2640             :         /* this sifts-up the next-largest entry and decreases memtupcount */
    2641       18174 :         tuplesort_heap_delete_top(state);
    2642       18174 :         state->memtuples[state->memtupcount] = stup;
    2643             :     }
    2644         408 :     state->memtupcount = tupcount;
    2645             : 
    2646             :     /*
    2647             :      * Reverse sort direction back to the original state.  This is not
    2648             :      * actually necessary but seems like a good idea for tidiness.
    2649             :      */
    2650         408 :     reversedirection(state);
    2651             : 
    2652         408 :     state->status = TSS_SORTEDINMEM;
    2653         408 :     state->boundUsed = true;
    2654         408 : }
    2655             : 
    2656             : /*
    2657             :  * Sort all memtuples using specialized qsort() routines.
    2658             :  *
    2659             :  * Quicksort is used for small in-memory sorts, and external sort runs.
    2660             :  */
    2661             : static void
    2662      236418 : tuplesort_sort_memtuples(Tuplesortstate *state)
    2663             : {
    2664             :     Assert(!LEADER(state));
    2665             : 
    2666      236418 :     if (state->memtupcount > 1)
    2667             :     {
    2668             :         /*
    2669             :          * Do we have the leading column's value or abbreviation in datum1,
    2670             :          * and is there a specialization for its comparator?
    2671             :          */
    2672       67950 :         if (state->base.haveDatum1 && state->base.sortKeys)
    2673             :         {
    2674       67914 :             if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
    2675             :             {
    2676        3356 :                 qsort_tuple_unsigned(state->memtuples,
    2677        3356 :                                      state->memtupcount,
    2678             :                                      state);
    2679        3340 :                 return;
    2680             :             }
    2681       64558 :             else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
    2682             :             {
    2683        1340 :                 qsort_tuple_signed(state->memtuples,
    2684        1340 :                                    state->memtupcount,
    2685             :                                    state);
    2686        1340 :                 return;
    2687             :             }
    2688       63218 :             else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
    2689             :             {
    2690       39732 :                 qsort_tuple_int32(state->memtuples,
    2691       39732 :                                   state->memtupcount,
    2692             :                                   state);
    2693       39672 :                 return;
    2694             :             }
    2695             :         }
    2696             : 
    2697             :         /* Can we use the single-key sort function? */
    2698       23522 :         if (state->base.onlyKey != NULL)
    2699             :         {
    2700       10064 :             qsort_ssup(state->memtuples, state->memtupcount,
    2701       10064 :                        state->base.onlyKey);
    2702             :         }
    2703             :         else
    2704             :         {
    2705       13458 :             qsort_tuple(state->memtuples,
    2706       13458 :                         state->memtupcount,
    2707             :                         state->base.comparetup,
    2708             :                         state);
    2709             :         }
    2710             :     }
    2711             : }
    2712             : 
    2713             : /*
    2714             :  * Insert a new tuple into an empty or existing heap, maintaining the
    2715             :  * heap invariant.  Caller is responsible for ensuring there's room.
    2716             :  *
    2717             :  * Note: For some callers, tuple points to a memtuples[] entry above the
    2718             :  * end of the heap.  This is safe as long as it's not immediately adjacent
    2719             :  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
    2720             :  * is, it might get overwritten before being moved into the heap!
    2721             :  */
    2722             : static void
    2723       19872 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
    2724             : {
    2725             :     SortTuple  *memtuples;
    2726             :     int         j;
    2727             : 
    2728       19872 :     memtuples = state->memtuples;
    2729             :     Assert(state->memtupcount < state->memtupsize);
    2730             : 
    2731       19872 :     CHECK_FOR_INTERRUPTS();
    2732             : 
    2733             :     /*
    2734             :      * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
    2735             :      * using 1-based array indexes, not 0-based.
    2736             :      */
    2737       19872 :     j = state->memtupcount++;
    2738       57742 :     while (j > 0)
    2739             :     {
    2740       50940 :         int         i = (j - 1) >> 1;
    2741             : 
    2742       50940 :         if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
    2743       13070 :             break;
    2744       37870 :         memtuples[j] = memtuples[i];
    2745       37870 :         j = i;
    2746             :     }
    2747       19872 :     memtuples[j] = *tuple;
    2748       19872 : }
    2749             : 
    2750             : /*
    2751             :  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
    2752             :  * memtupcount, and sift up to maintain the heap invariant.
    2753             :  *
    2754             :  * The caller has already free'd the tuple the top node points to,
    2755             :  * if necessary.
    2756             :  */
    2757             : static void
    2758       19416 : tuplesort_heap_delete_top(Tuplesortstate *state)
    2759             : {
    2760       19416 :     SortTuple  *memtuples = state->memtuples;
    2761             :     SortTuple  *tuple;
    2762             : 
    2763       19416 :     if (--state->memtupcount <= 0)
    2764         302 :         return;
    2765             : 
    2766             :     /*
    2767             :      * Remove the last tuple in the heap, and re-insert it, by replacing the
    2768             :      * current top node with it.
    2769             :      */
    2770       19114 :     tuple = &memtuples[state->memtupcount];
    2771       19114 :     tuplesort_heap_replace_top(state, tuple);
    2772             : }
    2773             : 
    2774             : /*
    2775             :  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
    2776             :  * maintain the heap invariant.
    2777             :  *
    2778             :  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
    2779             :  * Heapsort, steps H3-H8).
    2780             :  */
    2781             : static void
    2782     6049818 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
    2783             : {
    2784     6049818 :     SortTuple  *memtuples = state->memtuples;
    2785             :     unsigned int i,
    2786             :                 n;
    2787             : 
    2788             :     Assert(state->memtupcount >= 1);
    2789             : 
    2790     6049818 :     CHECK_FOR_INTERRUPTS();
    2791             : 
    2792             :     /*
    2793             :      * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
    2794             :      * This prevents overflow in the "2 * i + 1" calculation, since at the top
    2795             :      * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
    2796             :      */
    2797     6049818 :     n = state->memtupcount;
    2798     6049818 :     i = 0;                      /* i is where the "hole" is */
    2799             :     for (;;)
    2800     1827310 :     {
    2801     7877128 :         unsigned int j = 2 * i + 1;
    2802             : 
    2803     7877128 :         if (j >= n)
    2804     1250928 :             break;
    2805     9066576 :         if (j + 1 < n &&
    2806     2440376 :             COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
    2807      962962 :             j++;
    2808     6626200 :         if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
    2809     4798890 :             break;
    2810     1827310 :         memtuples[i] = memtuples[j];
    2811     1827310 :         i = j;
    2812             :     }
    2813     6049818 :     memtuples[i] = *tuple;
    2814     6049818 : }
    2815             : 
    2816             : /*
    2817             :  * Function to reverse the sort direction from its current state
    2818             :  *
    2819             :  * It is not safe to call this when performing hash tuplesorts
    2820             :  */
    2821             : static void
    2822         816 : reversedirection(Tuplesortstate *state)
    2823             : {
    2824         816 :     SortSupport sortKey = state->base.sortKeys;
    2825             :     int         nkey;
    2826             : 
    2827        1992 :     for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
    2828             :     {
    2829        1176 :         sortKey->ssup_reverse = !sortKey->ssup_reverse;
    2830        1176 :         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    2831             :     }
    2832         816 : }
    2833             : 
    2834             : 
    2835             : /*
    2836             :  * Tape interface routines
    2837             :  */
    2838             : 
    2839             : static unsigned int
    2840     5824346 : getlen(LogicalTape *tape, bool eofOK)
    2841             : {
    2842             :     unsigned int len;
    2843             : 
    2844     5824346 :     if (LogicalTapeRead(tape,
    2845             :                         &len, sizeof(len)) != sizeof(len))
    2846           0 :         elog(ERROR, "unexpected end of tape");
    2847     5824346 :     if (len == 0 && !eofOK)
    2848           0 :         elog(ERROR, "unexpected end of data");
    2849     5824346 :     return len;
    2850             : }
    2851             : 
    2852             : static void
    2853        1712 : markrunend(LogicalTape *tape)
    2854             : {
    2855        1712 :     unsigned int len = 0;
    2856             : 
    2857        1712 :     LogicalTapeWrite(tape, &len, sizeof(len));
    2858        1712 : }
    2859             : 
    2860             : /*
    2861             :  * Get memory for tuple from within READTUP() routine.
    2862             :  *
    2863             :  * We use next free slot from the slab allocator, or palloc() if the tuple
    2864             :  * is too large for that.
    2865             :  */
    2866             : void *
    2867     5412568 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
    2868             : {
    2869             :     SlabSlot   *buf;
    2870             : 
    2871             :     /*
    2872             :      * We pre-allocate enough slots in the slab arena that we should never run
    2873             :      * out.
    2874             :      */
    2875             :     Assert(state->slabFreeHead);
    2876             : 
    2877     5412568 :     if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
    2878           0 :         return MemoryContextAlloc(state->base.sortcontext, tuplen);
    2879             :     else
    2880             :     {
    2881     5412568 :         buf = state->slabFreeHead;
    2882             :         /* Reuse this slot */
    2883     5412568 :         state->slabFreeHead = buf->nextfree;
    2884             : 
    2885     5412568 :         return buf;
    2886             :     }
    2887             : }
    2888             : 
    2889             : 
    2890             : /*
    2891             :  * Parallel sort routines
    2892             :  */
    2893             : 
    2894             : /*
    2895             :  * tuplesort_estimate_shared - estimate required shared memory allocation
    2896             :  *
    2897             :  * nWorkers is an estimate of the number of workers (it's the number that
    2898             :  * will be requested).
    2899             :  */
    2900             : Size
    2901         164 : tuplesort_estimate_shared(int nWorkers)
    2902             : {
    2903             :     Size        tapesSize;
    2904             : 
    2905             :     Assert(nWorkers > 0);
    2906             : 
    2907             :     /* Make sure that BufFile shared state is MAXALIGN'd */
    2908         164 :     tapesSize = mul_size(sizeof(TapeShare), nWorkers);
    2909         164 :     tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
    2910             : 
    2911         164 :     return tapesSize;
    2912             : }
    2913             : 
    2914             : /*
    2915             :  * tuplesort_initialize_shared - initialize shared tuplesort state
    2916             :  *
    2917             :  * Must be called from leader process before workers are launched, to
    2918             :  * establish state needed up-front for worker tuplesortstates.  nWorkers
    2919             :  * should match the argument passed to tuplesort_estimate_shared().
    2920             :  */
    2921             : void
    2922         234 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
    2923             : {
    2924             :     int         i;
    2925             : 
    2926             :     Assert(nWorkers > 0);
    2927             : 
    2928         234 :     SpinLockInit(&shared->mutex);
    2929         234 :     shared->currentWorker = 0;
    2930         234 :     shared->workersFinished = 0;
    2931         234 :     SharedFileSetInit(&shared->fileset, seg);
    2932         234 :     shared->nTapes = nWorkers;
    2933         710 :     for (i = 0; i < nWorkers; i++)
    2934             :     {
    2935         476 :         shared->tapes[i].firstblocknumber = 0L;
    2936             :     }
    2937         234 : }
    2938             : 
    2939             : /*
    2940             :  * tuplesort_attach_shared - attach to shared tuplesort state
    2941             :  *
    2942             :  * Must be called by all worker processes.
    2943             :  */
    2944             : void
    2945         236 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
    2946             : {
    2947             :     /* Attach to SharedFileSet */
    2948         236 :     SharedFileSetAttach(&shared->fileset, seg);
    2949         236 : }
    2950             : 
    2951             : /*
    2952             :  * worker_get_identifier - Assign and return ordinal identifier for worker
    2953             :  *
    2954             :  * The order in which these are assigned is not well defined, and should not
    2955             :  * matter; worker numbers across parallel sort participants need only be
    2956             :  * distinct and gapless.  logtape.c requires this.
    2957             :  *
    2958             :  * Note that the identifiers assigned from here have no relation to
    2959             :  * ParallelWorkerNumber number, to avoid making any assumption about
    2960             :  * caller's requirements.  However, we do follow the ParallelWorkerNumber
    2961             :  * convention of representing a non-worker with worker number -1.  This
    2962             :  * includes the leader, as well as serial Tuplesort processes.
    2963             :  */
    2964             : static int
    2965         468 : worker_get_identifier(Tuplesortstate *state)
    2966             : {
    2967         468 :     Sharedsort *shared = state->shared;
    2968             :     int         worker;
    2969             : 
    2970             :     Assert(WORKER(state));
    2971             : 
    2972         468 :     SpinLockAcquire(&shared->mutex);
    2973         468 :     worker = shared->currentWorker++;
    2974         468 :     SpinLockRelease(&shared->mutex);
    2975             : 
    2976         468 :     return worker;
    2977             : }
    2978             : 
    2979             : /*
    2980             :  * worker_freeze_result_tape - freeze worker's result tape for leader
    2981             :  *
    2982             :  * This is called by workers just after the result tape has been determined,
    2983             :  * instead of calling LogicalTapeFreeze() directly.  They do so because
    2984             :  * workers require a few additional steps over similar serial
    2985             :  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
    2986             :  * steps are around freeing now unneeded resources, and representing to
    2987             :  * leader that worker's input run is available for its merge.
    2988             :  *
    2989             :  * There should only be one final output run for each worker, which consists
    2990             :  * of all tuples that were originally input into worker.
    2991             :  */
    2992             : static void
    2993         468 : worker_freeze_result_tape(Tuplesortstate *state)
    2994             : {
    2995         468 :     Sharedsort *shared = state->shared;
    2996             :     TapeShare   output;
    2997             : 
    2998             :     Assert(WORKER(state));
    2999             :     Assert(state->result_tape != NULL);
    3000             :     Assert(state->memtupcount == 0);
    3001             : 
    3002             :     /*
    3003             :      * Free most remaining memory, in case caller is sensitive to our holding
    3004             :      * on to it.  memtuples may not be a tiny merge heap at this point.
    3005             :      */
    3006         468 :     pfree(state->memtuples);
    3007             :     /* Be tidy */
    3008         468 :     state->memtuples = NULL;
    3009         468 :     state->memtupsize = 0;
    3010             : 
    3011             :     /*
    3012             :      * Parallel worker requires result tape metadata, which is to be stored in
    3013             :      * shared memory for leader
    3014             :      */
    3015         468 :     LogicalTapeFreeze(state->result_tape, &output);
    3016             : 
    3017             :     /* Store properties of output tape, and update finished worker count */
    3018         468 :     SpinLockAcquire(&shared->mutex);
    3019         468 :     shared->tapes[state->worker] = output;
    3020         468 :     shared->workersFinished++;
    3021         468 :     SpinLockRelease(&shared->mutex);
    3022         468 : }
    3023             : 
    3024             : /*
    3025             :  * worker_nomergeruns - dump memtuples in worker, without merging
    3026             :  *
    3027             :  * This called as an alternative to mergeruns() with a worker when no
    3028             :  * merging is required.
    3029             :  */
    3030             : static void
    3031         468 : worker_nomergeruns(Tuplesortstate *state)
    3032             : {
    3033             :     Assert(WORKER(state));
    3034             :     Assert(state->result_tape == NULL);
    3035             :     Assert(state->nOutputRuns == 1);
    3036             : 
    3037         468 :     state->result_tape = state->destTape;
    3038         468 :     worker_freeze_result_tape(state);
    3039         468 : }
    3040             : 
    3041             : /*
    3042             :  * leader_takeover_tapes - create tapeset for leader from worker tapes
    3043             :  *
    3044             :  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
    3045             :  * sorting has occurred in workers, all of which must have already returned
    3046             :  * from tuplesort_performsort().
    3047             :  *
    3048             :  * When this returns, leader process is left in a state that is virtually
    3049             :  * indistinguishable from it having generated runs as a serial external sort
    3050             :  * might have.
    3051             :  */
    3052             : static void
    3053         162 : leader_takeover_tapes(Tuplesortstate *state)
    3054             : {
    3055         162 :     Sharedsort *shared = state->shared;
    3056         162 :     int         nParticipants = state->nParticipants;
    3057             :     int         workersFinished;
    3058             :     int         j;
    3059             : 
    3060             :     Assert(LEADER(state));
    3061             :     Assert(nParticipants >= 1);
    3062             : 
    3063         162 :     SpinLockAcquire(&shared->mutex);
    3064         162 :     workersFinished = shared->workersFinished;
    3065         162 :     SpinLockRelease(&shared->mutex);
    3066             : 
    3067         162 :     if (nParticipants != workersFinished)
    3068           0 :         elog(ERROR, "cannot take over tapes before all workers finish");
    3069             : 
    3070             :     /*
    3071             :      * Create the tapeset from worker tapes, including a leader-owned tape at
    3072             :      * the end.  Parallel workers are far more expensive than logical tapes,
    3073             :      * so the number of tapes allocated here should never be excessive.
    3074             :      */
    3075         162 :     inittapestate(state, nParticipants);
    3076         162 :     state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
    3077             : 
    3078             :     /*
    3079             :      * Set currentRun to reflect the number of runs we will merge (it's not
    3080             :      * used for anything, this is just pro forma)
    3081             :      */
    3082         162 :     state->currentRun = nParticipants;
    3083             : 
    3084             :     /*
    3085             :      * Initialize the state to look the same as after building the initial
    3086             :      * runs.
    3087             :      *
    3088             :      * There will always be exactly 1 run per worker, and exactly one input
    3089             :      * tape per run, because workers always output exactly 1 run, even when
    3090             :      * there were no input tuples for workers to sort.
    3091             :      */
    3092         162 :     state->inputTapes = NULL;
    3093         162 :     state->nInputTapes = 0;
    3094         162 :     state->nInputRuns = 0;
    3095             : 
    3096         162 :     state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
    3097         162 :     state->nOutputTapes = nParticipants;
    3098         162 :     state->nOutputRuns = nParticipants;
    3099             : 
    3100         490 :     for (j = 0; j < nParticipants; j++)
    3101             :     {
    3102         328 :         state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    3103             :     }
    3104             : 
    3105         162 :     state->status = TSS_BUILDRUNS;
    3106         162 : }
    3107             : 
    3108             : /*
    3109             :  * Convenience routine to free a tuple previously loaded into sort memory
    3110             :  */
    3111             : static void
    3112     3760000 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
    3113             : {
    3114     3760000 :     if (stup->tuple)
    3115             :     {
    3116     3599722 :         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    3117     3599722 :         pfree(stup->tuple);
    3118     3599722 :         stup->tuple = NULL;
    3119             :     }
    3120     3760000 : }
    3121             : 
    3122             : int
    3123           0 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
    3124             : {
    3125           0 :     if (x < y)
    3126           0 :         return -1;
    3127           0 :     else if (x > y)
    3128           0 :         return 1;
    3129             :     else
    3130           0 :         return 0;
    3131             : }
    3132             : 
    3133             : int
    3134     1096186 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
    3135             : {
    3136     1096186 :     int64       xx = DatumGetInt64(x);
    3137     1096186 :     int64       yy = DatumGetInt64(y);
    3138             : 
    3139     1096186 :     if (xx < yy)
    3140      398422 :         return -1;
    3141      697764 :     else if (xx > yy)
    3142      353162 :         return 1;
    3143             :     else
    3144      344602 :         return 0;
    3145             : }
    3146             : 
    3147             : int
    3148   175706596 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
    3149             : {
    3150   175706596 :     int32       xx = DatumGetInt32(x);
    3151   175706596 :     int32       yy = DatumGetInt32(y);
    3152             : 
    3153   175706596 :     if (xx < yy)
    3154    42496654 :         return -1;
    3155   133209942 :     else if (xx > yy)
    3156    39877852 :         return 1;
    3157             :     else
    3158    93332090 :         return 0;
    3159             : }

Generated by: LCOV version 1.16