LCOV - code coverage report
Current view: top level - src/backend/utils/sort - tuplesort.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 724 812 89.2 %
Date: 2024-11-21 08:14:44 Functions: 54 55 98.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * tuplesort.c
       4             :  *    Generalized tuple sorting routines.
       5             :  *
       6             :  * This module provides a generalized facility for tuple sorting, which can be
       7             :  * applied to different kinds of sortable objects.  Implementation of
       8             :  * the particular sorting variants is given in tuplesortvariants.c.
       9             :  * This module works efficiently for both small and large amounts
      10             :  * of data.  Small amounts are sorted in-memory using qsort().  Large
      11             :  * amounts are sorted using temporary files and a standard external sort
      12             :  * algorithm.
      13             :  *
      14             :  * See Knuth, volume 3, for more than you want to know about external
      15             :  * sorting algorithms.  The algorithm we use is a balanced k-way merge.
      16             :  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
      17             :  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
      18             :  * merge is better.  Knuth is assuming that tape drives are expensive
      19             :  * beasts, and in particular that there will always be many more runs than
      20             :  * tape drives.  The polyphase merge algorithm was good at keeping all the
      21             :  * tape drives busy, but in our implementation a "tape drive" doesn't cost
      22             :  * much more than a few Kb of memory buffers, so we can afford to have
      23             :  * lots of them.  In particular, if we can have as many tape drives as
      24             :  * sorted runs, we can eliminate any repeated I/O at all.
      25             :  *
      26             :  * Historically, we divided the input into sorted runs using replacement
      27             :  * selection, in the form of a priority tree implemented as a heap
      28             :  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
      29             :  * for run generation.
      30             :  *
      31             :  * The approximate amount of memory allowed for any one sort operation
      32             :  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
      33             :  * we absorb tuples and simply store them in an unsorted array as long as
      34             :  * we haven't exceeded workMem.  If we reach the end of the input without
      35             :  * exceeding workMem, we sort the array using qsort() and subsequently return
      36             :  * tuples just by scanning the tuple array sequentially.  If we do exceed
      37             :  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
      38             :  * When tuples are dumped in batch after quicksorting, we begin a new run
      39             :  * with a new output tape.  If we reach the max number of tapes, we write
      40             :  * subsequent runs on the existing tapes in a round-robin fashion.  We will
      41             :  * need multiple merge passes to finish the merge in that case.  After the
      42             :  * end of the input is reached, we dump out remaining tuples in memory into
      43             :  * a final run, then merge the runs.
      44             :  *
      45             :  * When merging runs, we use a heap containing just the frontmost tuple from
      46             :  * each source run; we repeatedly output the smallest tuple and replace it
      47             :  * with the next tuple from its source tape (if any).  When the heap empties,
      48             :  * the merge is complete.  The basic merge algorithm thus needs very little
      49             :  * memory --- only M tuples for an M-way merge, and M is constrained to a
      50             :  * small number.  However, we can still make good use of our full workMem
      51             :  * allocation by pre-reading additional blocks from each source tape.  Without
      52             :  * prereading, our access pattern to the temporary file would be very erratic;
      53             :  * on average we'd read one block from each of M source tapes during the same
      54             :  * time that we're writing M blocks to the output tape, so there is no
      55             :  * sequentiality of access at all, defeating the read-ahead methods used by
      56             :  * most Unix kernels.  Worse, the output tape gets written into a very random
      57             :  * sequence of blocks of the temp file, ensuring that things will be even
      58             :  * worse when it comes time to read that tape.  A straightforward merge pass
      59             :  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
      60             :  * by prereading from each source tape sequentially, loading about workMem/M
      61             :  * bytes from each tape in turn, and making the sequential blocks immediately
      62             :  * available for reuse.  This approach helps to localize both read and write
      63             :  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
      64             :  * much memory to use for the buffers.
      65             :  *
      66             :  * In the current code we determine the number of input tapes M on the basis
      67             :  * of workMem: we want workMem/M to be large enough that we read a fair
      68             :  * amount of data each time we read from a tape, so as to maintain the
      69             :  * locality of access described above.  Nonetheless, with large workMem we
      70             :  * can have many tapes.  The logical "tapes" are implemented by logtape.c,
      71             :  * which avoids space wastage by recycling disk space as soon as each block
      72             :  * is read from its "tape".
      73             :  *
      74             :  * When the caller requests random access to the sort result, we form
      75             :  * the final sorted run on a logical tape which is then "frozen", so
      76             :  * that we can access it randomly.  When the caller does not need random
      77             :  * access, we return from tuplesort_performsort() as soon as we are down
      78             :  * to one run per logical tape.  The final merge is then performed
      79             :  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
      80             :  * saves one cycle of writing all the data out to disk and reading it in.
      81             :  *
      82             :  * This module supports parallel sorting.  Parallel sorts involve coordination
      83             :  * among one or more worker processes, and a leader process, each with its own
      84             :  * tuplesort state.  The leader process (or, more accurately, the
      85             :  * Tuplesortstate associated with a leader process) creates a full tapeset
      86             :  * consisting of worker tapes with one run to merge; a run for every
      87             :  * worker process.  This is then merged.  Worker processes are guaranteed to
      88             :  * produce exactly one output run from their partial input.
      89             :  *
      90             :  *
      91             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      92             :  * Portions Copyright (c) 1994, Regents of the University of California
      93             :  *
      94             :  * IDENTIFICATION
      95             :  *    src/backend/utils/sort/tuplesort.c
      96             :  *
      97             :  *-------------------------------------------------------------------------
      98             :  */
      99             : 
     100             : #include "postgres.h"
     101             : 
     102             : #include <limits.h>
     103             : 
     104             : #include "commands/tablespace.h"
     105             : #include "miscadmin.h"
     106             : #include "pg_trace.h"
     107             : #include "storage/shmem.h"
     108             : #include "utils/guc.h"
     109             : #include "utils/memutils.h"
     110             : #include "utils/pg_rusage.h"
     111             : #include "utils/tuplesort.h"
     112             : 
     113             : /*
     114             :  * Initial size of memtuples array.  We're trying to select this size so that
     115             :  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
     116             :  * allocation might possibly be lowered.  However, we don't consider array sizes
     117             :  * less than 1024.
     118             :  *
     119             :  */
     120             : #define INITIAL_MEMTUPSIZE Max(1024, \
     121             :     ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
     122             : 
     123             : /* GUC variables */
     124             : bool        trace_sort = false;
     125             : 
     126             : #ifdef DEBUG_BOUNDED_SORT
     127             : bool        optimize_bounded_sort = true;
     128             : #endif
     129             : 
     130             : 
     131             : /*
     132             :  * During merge, we use a pre-allocated set of fixed-size slots to hold
     133             :  * tuples.  To avoid palloc/pfree overhead.
     134             :  *
     135             :  * Merge doesn't require a lot of memory, so we can afford to waste some,
     136             :  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
     137             :  * palloc() overhead is not significant anymore.
     138             :  *
     139             :  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
     140             :  * slot holds a tuple.
     141             :  */
     142             : #define SLAB_SLOT_SIZE 1024
     143             : 
     144             : typedef union SlabSlot
     145             : {
     146             :     union SlabSlot *nextfree;
     147             :     char        buffer[SLAB_SLOT_SIZE];
     148             : } SlabSlot;
     149             : 
     150             : /*
     151             :  * Possible states of a Tuplesort object.  These denote the states that
     152             :  * persist between calls of Tuplesort routines.
     153             :  */
     154             : typedef enum
     155             : {
     156             :     TSS_INITIAL,                /* Loading tuples; still within memory limit */
     157             :     TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
     158             :     TSS_BUILDRUNS,              /* Loading tuples; writing to tape */
     159             :     TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
     160             :     TSS_SORTEDONTAPE,           /* Sort completed, final run is on tape */
     161             :     TSS_FINALMERGE,             /* Performing final merge on-the-fly */
     162             : } TupSortStatus;
     163             : 
     164             : /*
     165             :  * Parameters for calculation of number of tapes to use --- see inittapes()
     166             :  * and tuplesort_merge_order().
     167             :  *
     168             :  * In this calculation we assume that each tape will cost us about 1 blocks
     169             :  * worth of buffer space.  This ignores the overhead of all the other data
     170             :  * structures needed for each tape, but it's probably close enough.
     171             :  *
     172             :  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
     173             :  * input tape, for pre-reading (see discussion at top of file).  This is *in
     174             :  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
     175             :  */
     176             : #define MINORDER        6       /* minimum merge order */
     177             : #define MAXORDER        500     /* maximum merge order */
     178             : #define TAPE_BUFFER_OVERHEAD        BLCKSZ
     179             : #define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
     180             : 
     181             : 
     182             : /*
     183             :  * Private state of a Tuplesort operation.
     184             :  */
     185             : struct Tuplesortstate
     186             : {
     187             :     TuplesortPublic base;
     188             :     TupSortStatus status;       /* enumerated value as shown above */
     189             :     bool        bounded;        /* did caller specify a maximum number of
     190             :                                  * tuples to return? */
     191             :     bool        boundUsed;      /* true if we made use of a bounded heap */
     192             :     int         bound;          /* if bounded, the maximum number of tuples */
     193             :     int64       tupleMem;       /* memory consumed by individual tuples.
     194             :                                  * storing this separately from what we track
     195             :                                  * in availMem allows us to subtract the
     196             :                                  * memory consumed by all tuples when dumping
     197             :                                  * tuples to tape */
     198             :     int64       availMem;       /* remaining memory available, in bytes */
     199             :     int64       allowedMem;     /* total memory allowed, in bytes */
     200             :     int         maxTapes;       /* max number of input tapes to merge in each
     201             :                                  * pass */
     202             :     int64       maxSpace;       /* maximum amount of space occupied among sort
     203             :                                  * of groups, either in-memory or on-disk */
     204             :     bool        isMaxSpaceDisk; /* true when maxSpace is value for on-disk
     205             :                                  * space, false when it's value for in-memory
     206             :                                  * space */
     207             :     TupSortStatus maxSpaceStatus;   /* sort status when maxSpace was reached */
     208             :     LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
     209             : 
     210             :     /*
     211             :      * This array holds the tuples now in sort memory.  If we are in state
     212             :      * INITIAL, the tuples are in no particular order; if we are in state
     213             :      * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     214             :      * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     215             :      * H.  In state SORTEDONTAPE, the array is not used.
     216             :      */
     217             :     SortTuple  *memtuples;      /* array of SortTuple structs */
     218             :     int         memtupcount;    /* number of tuples currently present */
     219             :     int         memtupsize;     /* allocated length of memtuples array */
     220             :     bool        growmemtuples;  /* memtuples' growth still underway? */
     221             : 
     222             :     /*
     223             :      * Memory for tuples is sometimes allocated using a simple slab allocator,
     224             :      * rather than with palloc().  Currently, we switch to slab allocation
     225             :      * when we start merging.  Merging only needs to keep a small, fixed
     226             :      * number of tuples in memory at any time, so we can avoid the
     227             :      * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     228             :      * to hold the tuples.
     229             :      *
     230             :      * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     231             :      * slots.  The allocation is sized to have one slot per tape, plus one
     232             :      * additional slot.  We need that many slots to hold all the tuples kept
     233             :      * in the heap during merge, plus the one we have last returned from the
     234             :      * sort, with tuplesort_gettuple.
     235             :      *
     236             :      * Initially, all the slots are kept in a linked list of free slots.  When
     237             :      * a tuple is read from a tape, it is put to the next available slot, if
     238             :      * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     239             :      * instead.
     240             :      *
     241             :      * When we're done processing a tuple, we return the slot back to the free
     242             :      * list, or pfree() if it was palloc'd.  We know that a tuple was
     243             :      * allocated from the slab, if its pointer value is between
     244             :      * slabMemoryBegin and -End.
     245             :      *
     246             :      * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     247             :      * tracking memory usage is not used.
     248             :      */
     249             :     bool        slabAllocatorUsed;
     250             : 
     251             :     char       *slabMemoryBegin;    /* beginning of slab memory arena */
     252             :     char       *slabMemoryEnd;  /* end of slab memory arena */
     253             :     SlabSlot   *slabFreeHead;   /* head of free list */
     254             : 
     255             :     /* Memory used for input and output tape buffers. */
     256             :     size_t      tape_buffer_mem;
     257             : 
     258             :     /*
     259             :      * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     260             :      * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     261             :      * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     262             :      * recycle the memory on next gettuple call.
     263             :      */
     264             :     void       *lastReturnedTuple;
     265             : 
     266             :     /*
     267             :      * While building initial runs, this is the current output run number.
     268             :      * Afterwards, it is the number of initial runs we made.
     269             :      */
     270             :     int         currentRun;
     271             : 
     272             :     /*
     273             :      * Logical tapes, for merging.
     274             :      *
     275             :      * The initial runs are written in the output tapes.  In each merge pass,
     276             :      * the output tapes of the previous pass become the input tapes, and new
     277             :      * output tapes are created as needed.  When nInputTapes equals
     278             :      * nInputRuns, there is only one merge pass left.
     279             :      */
     280             :     LogicalTape **inputTapes;
     281             :     int         nInputTapes;
     282             :     int         nInputRuns;
     283             : 
     284             :     LogicalTape **outputTapes;
     285             :     int         nOutputTapes;
     286             :     int         nOutputRuns;
     287             : 
     288             :     LogicalTape *destTape;      /* current output tape */
     289             : 
     290             :     /*
     291             :      * These variables are used after completion of sorting to keep track of
     292             :      * the next tuple to return.  (In the tape case, the tape's current read
     293             :      * position is also critical state.)
     294             :      */
     295             :     LogicalTape *result_tape;   /* actual tape of finished output */
     296             :     int         current;        /* array index (only used if SORTEDINMEM) */
     297             :     bool        eof_reached;    /* reached EOF (needed for cursors) */
     298             : 
     299             :     /* markpos_xxx holds marked position for mark and restore */
     300             :     int64       markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
     301             :     int         markpos_offset; /* saved "current", or offset in tape block */
     302             :     bool        markpos_eof;    /* saved "eof_reached" */
     303             : 
     304             :     /*
     305             :      * These variables are used during parallel sorting.
     306             :      *
     307             :      * worker is our worker identifier.  Follows the general convention that
     308             :      * -1 value relates to a leader tuplesort, and values >= 0 worker
     309             :      * tuplesorts. (-1 can also be a serial tuplesort.)
     310             :      *
     311             :      * shared is mutable shared memory state, which is used to coordinate
     312             :      * parallel sorts.
     313             :      *
     314             :      * nParticipants is the number of worker Tuplesortstates known by the
     315             :      * leader to have actually been launched, which implies that they must
     316             :      * finish a run that the leader needs to merge.  Typically includes a
     317             :      * worker state held by the leader process itself.  Set in the leader
     318             :      * Tuplesortstate only.
     319             :      */
     320             :     int         worker;
     321             :     Sharedsort *shared;
     322             :     int         nParticipants;
     323             : 
     324             :     /*
     325             :      * Additional state for managing "abbreviated key" sortsupport routines
     326             :      * (which currently may be used by all cases except the hash index case).
     327             :      * Tracks the intervals at which the optimization's effectiveness is
     328             :      * tested.
     329             :      */
     330             :     int64       abbrevNext;     /* Tuple # at which to next check
     331             :                                  * applicability */
     332             : 
     333             :     /*
     334             :      * Resource snapshot for time of sort start.
     335             :      */
     336             :     PGRUsage    ru_start;
     337             : };
     338             : 
     339             : /*
     340             :  * Private mutable state of tuplesort-parallel-operation.  This is allocated
     341             :  * in shared memory.
     342             :  */
     343             : struct Sharedsort
     344             : {
     345             :     /* mutex protects all fields prior to tapes */
     346             :     slock_t     mutex;
     347             : 
     348             :     /*
     349             :      * currentWorker generates ordinal identifier numbers for parallel sort
     350             :      * workers.  These start from 0, and are always gapless.
     351             :      *
     352             :      * Workers increment workersFinished to indicate having finished.  If this
     353             :      * is equal to state.nParticipants within the leader, leader is ready to
     354             :      * merge worker runs.
     355             :      */
     356             :     int         currentWorker;
     357             :     int         workersFinished;
     358             : 
     359             :     /* Temporary file space */
     360             :     SharedFileSet fileset;
     361             : 
     362             :     /* Size of tapes flexible array */
     363             :     int         nTapes;
     364             : 
     365             :     /*
     366             :      * Tapes array used by workers to report back information needed by the
     367             :      * leader to concatenate all worker tapes into one for merging
     368             :      */
     369             :     TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
     370             : };
     371             : 
     372             : /*
     373             :  * Is the given tuple allocated from the slab memory arena?
     374             :  */
     375             : #define IS_SLAB_SLOT(state, tuple) \
     376             :     ((char *) (tuple) >= (state)->slabMemoryBegin && \
     377             :      (char *) (tuple) < (state)->slabMemoryEnd)
     378             : 
     379             : /*
     380             :  * Return the given tuple to the slab memory free list, or free it
     381             :  * if it was palloc'd.
     382             :  */
     383             : #define RELEASE_SLAB_SLOT(state, tuple) \
     384             :     do { \
     385             :         SlabSlot *buf = (SlabSlot *) tuple; \
     386             :         \
     387             :         if (IS_SLAB_SLOT((state), buf)) \
     388             :         { \
     389             :             buf->nextfree = (state)->slabFreeHead; \
     390             :             (state)->slabFreeHead = buf; \
     391             :         } else \
     392             :             pfree(buf); \
     393             :     } while(0)
     394             : 
     395             : #define REMOVEABBREV(state,stup,count)  ((*(state)->base.removeabbrev) (state, stup, count))
     396             : #define COMPARETUP(state,a,b)   ((*(state)->base.comparetup) (a, b, state))
     397             : #define WRITETUP(state,tape,stup)   ((*(state)->base.writetup) (state, tape, stup))
     398             : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
     399             : #define FREESTATE(state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
     400             : #define LACKMEM(state)      ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
     401             : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     402             : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     403             : #define SERIAL(state)       ((state)->shared == NULL)
     404             : #define WORKER(state)       ((state)->shared && (state)->worker != -1)
     405             : #define LEADER(state)       ((state)->shared && (state)->worker == -1)
     406             : 
     407             : /*
     408             :  * NOTES about on-tape representation of tuples:
     409             :  *
     410             :  * We require the first "unsigned int" of a stored tuple to be the total size
     411             :  * on-tape of the tuple, including itself (so it is never zero; an all-zero
     412             :  * unsigned int is used to delimit runs).  The remainder of the stored tuple
     413             :  * may or may not match the in-memory representation of the tuple ---
     414             :  * any conversion needed is the job of the writetup and readtup routines.
     415             :  *
     416             :  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
     417             :  * representation of the tuple must be followed by another "unsigned int" that
     418             :  * is a copy of the length --- so the total tape space used is actually
     419             :  * sizeof(unsigned int) more than the stored length value.  This allows
     420             :  * read-backwards.  When the random access flag was not specified, the
     421             :  * write/read routines may omit the extra length word.
     422             :  *
     423             :  * writetup is expected to write both length words as well as the tuple
     424             :  * data.  When readtup is called, the tape is positioned just after the
     425             :  * front length word; readtup must read the tuple data and advance past
     426             :  * the back length word (if present).
     427             :  *
     428             :  * The write/read routines can make use of the tuple description data
     429             :  * stored in the Tuplesortstate record, if needed.  They are also expected
     430             :  * to adjust state->availMem by the amount of memory space (not tape space!)
     431             :  * released or consumed.  There is no error return from either writetup
     432             :  * or readtup; they should ereport() on failure.
     433             :  *
     434             :  *
     435             :  * NOTES about memory consumption calculations:
     436             :  *
     437             :  * We count space allocated for tuples against the workMem limit, plus
     438             :  * the space used by the variable-size memtuples array.  Fixed-size space
     439             :  * is not counted; it's small enough to not be interesting.
     440             :  *
     441             :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     442             :  * rather than the originally-requested size.  This is important since
     443             :  * palloc can add substantial overhead.  It's not a complete answer since
     444             :  * we won't count any wasted space in palloc allocation blocks, but it's
     445             :  * a lot better than what we were doing before 7.3.  As of 9.6, a
     446             :  * separate memory context is used for caller passed tuples.  Resetting
     447             :  * it at certain key increments significantly ameliorates fragmentation.
     448             :  * readtup routines use the slab allocator (they cannot use
     449             :  * the reset context because it gets deleted at the point that merging
     450             :  * begins).
     451             :  */
     452             : 
     453             : 
     454             : static void tuplesort_begin_batch(Tuplesortstate *state);
     455             : static bool consider_abort_common(Tuplesortstate *state);
     456             : static void inittapes(Tuplesortstate *state, bool mergeruns);
     457             : static void inittapestate(Tuplesortstate *state, int maxTapes);
     458             : static void selectnewtape(Tuplesortstate *state);
     459             : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
     460             : static void mergeruns(Tuplesortstate *state);
     461             : static void mergeonerun(Tuplesortstate *state);
     462             : static void beginmerge(Tuplesortstate *state);
     463             : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
     464             : static void dumptuples(Tuplesortstate *state, bool alltuples);
     465             : static void make_bounded_heap(Tuplesortstate *state);
     466             : static void sort_bounded_heap(Tuplesortstate *state);
     467             : static void tuplesort_sort_memtuples(Tuplesortstate *state);
     468             : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
     469             : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
     470             : static void tuplesort_heap_delete_top(Tuplesortstate *state);
     471             : static void reversedirection(Tuplesortstate *state);
     472             : static unsigned int getlen(LogicalTape *tape, bool eofOK);
     473             : static void markrunend(LogicalTape *tape);
     474             : static int  worker_get_identifier(Tuplesortstate *state);
     475             : static void worker_freeze_result_tape(Tuplesortstate *state);
     476             : static void worker_nomergeruns(Tuplesortstate *state);
     477             : static void leader_takeover_tapes(Tuplesortstate *state);
     478             : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
     479             : static void tuplesort_free(Tuplesortstate *state);
     480             : static void tuplesort_updatemax(Tuplesortstate *state);
     481             : 
     482             : /*
     483             :  * Specialized comparators that we can inline into specialized sorts.  The goal
     484             :  * is to try to sort two tuples without having to follow the pointers to the
     485             :  * comparator or the tuple.
     486             :  *
     487             :  * XXX: For now, there is no specialization for cases where datum1 is
     488             :  * authoritative and we don't even need to fall back to a callback at all (that
     489             :  * would be true for types like int4/int8/timestamp/date, but not true for
     490             :  * abbreviations of text or multi-key sorts.  There could be!  Is it worth it?
     491             :  */
     492             : 
     493             : /* Used if first key's comparator is ssup_datum_unsigned_cmp */
     494             : static pg_attribute_always_inline int
     495    46429924 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     496             : {
     497             :     int         compare;
     498             : 
     499    46429924 :     compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
     500    46429924 :                                           b->datum1, b->isnull1,
     501             :                                           &state->base.sortKeys[0]);
     502    46429924 :     if (compare != 0)
     503    41888254 :         return compare;
     504             : 
     505             :     /*
     506             :      * No need to waste effort calling the tiebreak function when there are no
     507             :      * other keys to sort on.
     508             :      */
     509     4541670 :     if (state->base.onlyKey != NULL)
     510           0 :         return 0;
     511             : 
     512     4541670 :     return state->base.comparetup_tiebreak(a, b, state);
     513             : }
     514             : 
     515             : #if SIZEOF_DATUM >= 8
     516             : /* Used if first key's comparator is ssup_datum_signed_cmp */
     517             : static pg_attribute_always_inline int
     518     5624776 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     519             : {
     520             :     int         compare;
     521             : 
     522     5624776 :     compare = ApplySignedSortComparator(a->datum1, a->isnull1,
     523     5624776 :                                         b->datum1, b->isnull1,
     524             :                                         &state->base.sortKeys[0]);
     525             : 
     526     5624776 :     if (compare != 0)
     527     5612842 :         return compare;
     528             : 
     529             :     /*
     530             :      * No need to waste effort calling the tiebreak function when there are no
     531             :      * other keys to sort on.
     532             :      */
     533       11934 :     if (state->base.onlyKey != NULL)
     534        1102 :         return 0;
     535             : 
     536       10832 :     return state->base.comparetup_tiebreak(a, b, state);
     537             : }
     538             : #endif
     539             : 
     540             : /* Used if first key's comparator is ssup_datum_int32_cmp */
     541             : static pg_attribute_always_inline int
     542    52302590 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     543             : {
     544             :     int         compare;
     545             : 
     546    52302590 :     compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
     547    52302590 :                                        b->datum1, b->isnull1,
     548             :                                        &state->base.sortKeys[0]);
     549             : 
     550    52302590 :     if (compare != 0)
     551    36873066 :         return compare;
     552             : 
     553             :     /*
     554             :      * No need to waste effort calling the tiebreak function when there are no
     555             :      * other keys to sort on.
     556             :      */
     557    15429524 :     if (state->base.onlyKey != NULL)
     558     1677044 :         return 0;
     559             : 
     560    13752480 :     return state->base.comparetup_tiebreak(a, b, state);
     561             : }
     562             : 
     563             : /*
     564             :  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
     565             :  * any variant of SortTuples, using the appropriate comparetup function.
     566             :  * qsort_ssup() is specialized for the case where the comparetup function
     567             :  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
     568             :  * and Datum sorts.  qsort_tuple_{unsigned,signed,int32} are specialized for
     569             :  * common comparison functions on pass-by-value leading datums.
     570             :  */
     571             : 
     572             : #define ST_SORT qsort_tuple_unsigned
     573             : #define ST_ELEMENT_TYPE SortTuple
     574             : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
     575             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     576             : #define ST_CHECK_FOR_INTERRUPTS
     577             : #define ST_SCOPE static
     578             : #define ST_DEFINE
     579             : #include "lib/sort_template.h"
     580             : 
     581             : #if SIZEOF_DATUM >= 8
     582             : #define ST_SORT qsort_tuple_signed
     583             : #define ST_ELEMENT_TYPE SortTuple
     584             : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
     585             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     586             : #define ST_CHECK_FOR_INTERRUPTS
     587             : #define ST_SCOPE static
     588             : #define ST_DEFINE
     589             : #include "lib/sort_template.h"
     590             : #endif
     591             : 
     592             : #define ST_SORT qsort_tuple_int32
     593             : #define ST_ELEMENT_TYPE SortTuple
     594             : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
     595             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     596             : #define ST_CHECK_FOR_INTERRUPTS
     597             : #define ST_SCOPE static
     598             : #define ST_DEFINE
     599             : #include "lib/sort_template.h"
     600             : 
     601             : #define ST_SORT qsort_tuple
     602             : #define ST_ELEMENT_TYPE SortTuple
     603             : #define ST_COMPARE_RUNTIME_POINTER
     604             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     605             : #define ST_CHECK_FOR_INTERRUPTS
     606             : #define ST_SCOPE static
     607             : #define ST_DECLARE
     608             : #define ST_DEFINE
     609             : #include "lib/sort_template.h"
     610             : 
     611             : #define ST_SORT qsort_ssup
     612             : #define ST_ELEMENT_TYPE SortTuple
     613             : #define ST_COMPARE(a, b, ssup) \
     614             :     ApplySortComparator((a)->datum1, (a)->isnull1, \
     615             :                         (b)->datum1, (b)->isnull1, (ssup))
     616             : #define ST_COMPARE_ARG_TYPE SortSupportData
     617             : #define ST_CHECK_FOR_INTERRUPTS
     618             : #define ST_SCOPE static
     619             : #define ST_DEFINE
     620             : #include "lib/sort_template.h"
     621             : 
     622             : /*
     623             :  *      tuplesort_begin_xxx
     624             :  *
     625             :  * Initialize for a tuple sort operation.
     626             :  *
     627             :  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
     628             :  * zero or more times, then call tuplesort_performsort when all the tuples
     629             :  * have been supplied.  After performsort, retrieve the tuples in sorted
     630             :  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
     631             :  * access was requested, rescan, markpos, and restorepos can also be called.)
     632             :  * Call tuplesort_end to terminate the operation and release memory/disk space.
     633             :  *
     634             :  * Each variant of tuplesort_begin has a workMem parameter specifying the
     635             :  * maximum number of kilobytes of RAM to use before spilling data to disk.
     636             :  * (The normal value of this parameter is work_mem, but some callers use
     637             :  * other values.)  Each variant also has a sortopt which is a bitmask of
     638             :  * sort options.  See TUPLESORT_* definitions in tuplesort.h
     639             :  */
     640             : 
     641             : Tuplesortstate *
     642      250550 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
     643             : {
     644             :     Tuplesortstate *state;
     645             :     MemoryContext maincontext;
     646             :     MemoryContext sortcontext;
     647             :     MemoryContext oldcontext;
     648             : 
     649             :     /* See leader_takeover_tapes() remarks on random access support */
     650      250550 :     if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
     651           0 :         elog(ERROR, "random access disallowed under parallel sort");
     652             : 
     653             :     /*
     654             :      * Memory context surviving tuplesort_reset.  This memory context holds
     655             :      * data which is useful to keep while sorting multiple similar batches.
     656             :      */
     657      250550 :     maincontext = AllocSetContextCreate(CurrentMemoryContext,
     658             :                                         "TupleSort main",
     659             :                                         ALLOCSET_DEFAULT_SIZES);
     660             : 
     661             :     /*
     662             :      * Create a working memory context for one sort operation.  The content of
     663             :      * this context is deleted by tuplesort_reset.
     664             :      */
     665      250550 :     sortcontext = AllocSetContextCreate(maincontext,
     666             :                                         "TupleSort sort",
     667             :                                         ALLOCSET_DEFAULT_SIZES);
     668             : 
     669             :     /*
     670             :      * Additionally a working memory context for tuples is setup in
     671             :      * tuplesort_begin_batch.
     672             :      */
     673             : 
     674             :     /*
     675             :      * Make the Tuplesortstate within the per-sortstate context.  This way, we
     676             :      * don't need a separate pfree() operation for it at shutdown.
     677             :      */
     678      250550 :     oldcontext = MemoryContextSwitchTo(maincontext);
     679             : 
     680      250550 :     state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
     681             : 
     682      250550 :     if (trace_sort)
     683           0 :         pg_rusage_init(&state->ru_start);
     684             : 
     685      250550 :     state->base.sortopt = sortopt;
     686      250550 :     state->base.tuples = true;
     687      250550 :     state->abbrevNext = 10;
     688             : 
     689             :     /*
     690             :      * workMem is forced to be at least 64KB, the current minimum valid value
     691             :      * for the work_mem GUC.  This is a defense against parallel sort callers
     692             :      * that divide out memory among many workers in a way that leaves each
     693             :      * with very little memory.
     694             :      */
     695      250550 :     state->allowedMem = Max(workMem, 64) * (int64) 1024;
     696      250550 :     state->base.sortcontext = sortcontext;
     697      250550 :     state->base.maincontext = maincontext;
     698             : 
     699             :     /*
     700             :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     701             :      * see comments in grow_memtuples().
     702             :      */
     703      250550 :     state->memtupsize = INITIAL_MEMTUPSIZE;
     704      250550 :     state->memtuples = NULL;
     705             : 
     706             :     /*
     707             :      * After all of the other non-parallel-related state, we setup all of the
     708             :      * state needed for each batch.
     709             :      */
     710      250550 :     tuplesort_begin_batch(state);
     711             : 
     712             :     /*
     713             :      * Initialize parallel-related state based on coordination information
     714             :      * from caller
     715             :      */
     716      250550 :     if (!coordinate)
     717             :     {
     718             :         /* Serial sort */
     719      249886 :         state->shared = NULL;
     720      249886 :         state->worker = -1;
     721      249886 :         state->nParticipants = -1;
     722             :     }
     723         664 :     else if (coordinate->isWorker)
     724             :     {
     725             :         /* Parallel worker produces exactly one final run from all input */
     726         444 :         state->shared = coordinate->sharedsort;
     727         444 :         state->worker = worker_get_identifier(state);
     728         444 :         state->nParticipants = -1;
     729             :     }
     730             :     else
     731             :     {
     732             :         /* Parallel leader state only used for final merge */
     733         220 :         state->shared = coordinate->sharedsort;
     734         220 :         state->worker = -1;
     735         220 :         state->nParticipants = coordinate->nParticipants;
     736             :         Assert(state->nParticipants >= 1);
     737             :     }
     738             : 
     739      250550 :     MemoryContextSwitchTo(oldcontext);
     740             : 
     741      250550 :     return state;
     742             : }
     743             : 
     744             : /*
     745             :  *      tuplesort_begin_batch
     746             :  *
     747             :  * Setup, or reset, all state need for processing a new set of tuples with this
     748             :  * sort state. Called both from tuplesort_begin_common (the first time sorting
     749             :  * with this sort state) and tuplesort_reset (for subsequent usages).
     750             :  */
     751             : static void
     752      253316 : tuplesort_begin_batch(Tuplesortstate *state)
     753             : {
     754             :     MemoryContext oldcontext;
     755             : 
     756      253316 :     oldcontext = MemoryContextSwitchTo(state->base.maincontext);
     757             : 
     758             :     /*
     759             :      * Caller tuple (e.g. IndexTuple) memory context.
     760             :      *
     761             :      * A dedicated child context used exclusively for caller passed tuples
     762             :      * eases memory management.  Resetting at key points reduces
     763             :      * fragmentation. Note that the memtuples array of SortTuples is allocated
     764             :      * in the parent context, not this context, because there is no need to
     765             :      * free memtuples early.  For bounded sorts, tuples may be pfreed in any
     766             :      * order, so we use a regular aset.c context so that it can make use of
     767             :      * free'd memory.  When the sort is not bounded, we make use of a bump.c
     768             :      * context as this keeps allocations more compact with less wastage.
     769             :      * Allocations are also slightly more CPU efficient.
     770             :      */
     771      253316 :     if (TupleSortUseBumpTupleCxt(state->base.sortopt))
     772      251988 :         state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
     773             :                                                      "Caller tuples",
     774             :                                                      ALLOCSET_DEFAULT_SIZES);
     775             :     else
     776        1328 :         state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
     777             :                                                          "Caller tuples",
     778             :                                                          ALLOCSET_DEFAULT_SIZES);
     779             : 
     780             : 
     781      253316 :     state->status = TSS_INITIAL;
     782      253316 :     state->bounded = false;
     783      253316 :     state->boundUsed = false;
     784             : 
     785      253316 :     state->availMem = state->allowedMem;
     786             : 
     787      253316 :     state->tapeset = NULL;
     788             : 
     789      253316 :     state->memtupcount = 0;
     790             : 
     791             :     /*
     792             :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     793             :      * see comments in grow_memtuples().
     794             :      */
     795      253316 :     state->growmemtuples = true;
     796      253316 :     state->slabAllocatorUsed = false;
     797      253316 :     if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
     798             :     {
     799           0 :         pfree(state->memtuples);
     800           0 :         state->memtuples = NULL;
     801           0 :         state->memtupsize = INITIAL_MEMTUPSIZE;
     802             :     }
     803      253316 :     if (state->memtuples == NULL)
     804             :     {
     805      250550 :         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
     806      250550 :         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     807             :     }
     808             : 
     809             :     /* workMem must be large enough for the minimal memtuples array */
     810      253316 :     if (LACKMEM(state))
     811           0 :         elog(ERROR, "insufficient memory allowed for sort");
     812             : 
     813      253316 :     state->currentRun = 0;
     814             : 
     815             :     /*
     816             :      * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
     817             :      * inittapes(), if needed.
     818             :      */
     819             : 
     820      253316 :     state->result_tape = NULL;   /* flag that result tape has not been formed */
     821             : 
     822      253316 :     MemoryContextSwitchTo(oldcontext);
     823      253316 : }
     824             : 
     825             : /*
     826             :  * tuplesort_set_bound
     827             :  *
     828             :  *  Advise tuplesort that at most the first N result tuples are required.
     829             :  *
     830             :  * Must be called before inserting any tuples.  (Actually, we could allow it
     831             :  * as long as the sort hasn't spilled to disk, but there seems no need for
     832             :  * delayed calls at the moment.)
     833             :  *
     834             :  * This is a hint only. The tuplesort may still return more tuples than
     835             :  * requested.  Parallel leader tuplesorts will always ignore the hint.
     836             :  */
     837             : void
     838        1194 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
     839             : {
     840             :     /* Assert we're called before loading any tuples */
     841             :     Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
     842             :     /* Assert we allow bounded sorts */
     843             :     Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
     844             :     /* Can't set the bound twice, either */
     845             :     Assert(!state->bounded);
     846             :     /* Also, this shouldn't be called in a parallel worker */
     847             :     Assert(!WORKER(state));
     848             : 
     849             :     /* Parallel leader allows but ignores hint */
     850        1194 :     if (LEADER(state))
     851           0 :         return;
     852             : 
     853             : #ifdef DEBUG_BOUNDED_SORT
     854             :     /* Honor GUC setting that disables the feature (for easy testing) */
     855             :     if (!optimize_bounded_sort)
     856             :         return;
     857             : #endif
     858             : 
     859             :     /* We want to be able to compute bound * 2, so limit the setting */
     860        1194 :     if (bound > (int64) (INT_MAX / 2))
     861           0 :         return;
     862             : 
     863        1194 :     state->bounded = true;
     864        1194 :     state->bound = (int) bound;
     865             : 
     866             :     /*
     867             :      * Bounded sorts are not an effective target for abbreviated key
     868             :      * optimization.  Disable by setting state to be consistent with no
     869             :      * abbreviation support.
     870             :      */
     871        1194 :     state->base.sortKeys->abbrev_converter = NULL;
     872        1194 :     if (state->base.sortKeys->abbrev_full_comparator)
     873          16 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
     874             : 
     875             :     /* Not strictly necessary, but be tidy */
     876        1194 :     state->base.sortKeys->abbrev_abort = NULL;
     877        1194 :     state->base.sortKeys->abbrev_full_comparator = NULL;
     878             : }
     879             : 
     880             : /*
     881             :  * tuplesort_used_bound
     882             :  *
     883             :  * Allow callers to find out if the sort state was able to use a bound.
     884             :  */
     885             : bool
     886         110 : tuplesort_used_bound(Tuplesortstate *state)
     887             : {
     888         110 :     return state->boundUsed;
     889             : }
     890             : 
     891             : /*
     892             :  * tuplesort_free
     893             :  *
     894             :  *  Internal routine for freeing resources of tuplesort.
     895             :  */
     896             : static void
     897      253072 : tuplesort_free(Tuplesortstate *state)
     898             : {
     899             :     /* context swap probably not needed, but let's be safe */
     900      253072 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
     901             :     int64       spaceUsed;
     902             : 
     903      253072 :     if (state->tapeset)
     904         720 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset);
     905             :     else
     906      252352 :         spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
     907             : 
     908             :     /*
     909             :      * Delete temporary "tape" files, if any.
     910             :      *
     911             :      * We don't bother to destroy the individual tapes here. They will go away
     912             :      * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
     913             :      * finished tapes already.)
     914             :      */
     915      253072 :     if (state->tapeset)
     916         720 :         LogicalTapeSetClose(state->tapeset);
     917             : 
     918      253072 :     if (trace_sort)
     919             :     {
     920           0 :         if (state->tapeset)
     921           0 :             elog(LOG, "%s of worker %d ended, %lld disk blocks used: %s",
     922             :                  SERIAL(state) ? "external sort" : "parallel external sort",
     923             :                  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
     924             :         else
     925           0 :             elog(LOG, "%s of worker %d ended, %lld KB used: %s",
     926             :                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
     927             :                  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
     928             :     }
     929             : 
     930             :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
     931             : 
     932      253072 :     FREESTATE(state);
     933      253072 :     MemoryContextSwitchTo(oldcontext);
     934             : 
     935             :     /*
     936             :      * Free the per-sort memory context, thereby releasing all working memory.
     937             :      */
     938      253072 :     MemoryContextReset(state->base.sortcontext);
     939      253072 : }
     940             : 
     941             : /*
     942             :  * tuplesort_end
     943             :  *
     944             :  *  Release resources and clean up.
     945             :  *
     946             :  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
     947             :  * pointing to garbage.  Be careful not to attempt to use or free such
     948             :  * pointers afterwards!
     949             :  */
     950             : void
     951      250306 : tuplesort_end(Tuplesortstate *state)
     952             : {
     953      250306 :     tuplesort_free(state);
     954             : 
     955             :     /*
     956             :      * Free the main memory context, including the Tuplesortstate struct
     957             :      * itself.
     958             :      */
     959      250306 :     MemoryContextDelete(state->base.maincontext);
     960      250306 : }
     961             : 
     962             : /*
     963             :  * tuplesort_updatemax
     964             :  *
     965             :  *  Update maximum resource usage statistics.
     966             :  */
     967             : static void
     968        3156 : tuplesort_updatemax(Tuplesortstate *state)
     969             : {
     970             :     int64       spaceUsed;
     971             :     bool        isSpaceDisk;
     972             : 
     973             :     /*
     974             :      * Note: it might seem we should provide both memory and disk usage for a
     975             :      * disk-based sort.  However, the current code doesn't track memory space
     976             :      * accurately once we have begun to return tuples to the caller (since we
     977             :      * don't account for pfree's the caller is expected to do), so we cannot
     978             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
     979             :      * to fix.  Is it worth creating an API for the memory context code to
     980             :      * tell us how much is actually used in sortcontext?
     981             :      */
     982        3156 :     if (state->tapeset)
     983             :     {
     984           6 :         isSpaceDisk = true;
     985           6 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
     986             :     }
     987             :     else
     988             :     {
     989        3150 :         isSpaceDisk = false;
     990        3150 :         spaceUsed = state->allowedMem - state->availMem;
     991             :     }
     992             : 
     993             :     /*
     994             :      * Sort evicts data to the disk when it wasn't able to fit that data into
     995             :      * main memory.  This is why we assume space used on the disk to be more
     996             :      * important for tracking resource usage than space used in memory. Note
     997             :      * that the amount of space occupied by some tupleset on the disk might be
     998             :      * less than amount of space occupied by the same tupleset in memory due
     999             :      * to more compact representation.
    1000             :      */
    1001        3156 :     if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
    1002        3150 :         (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
    1003             :     {
    1004         412 :         state->maxSpace = spaceUsed;
    1005         412 :         state->isMaxSpaceDisk = isSpaceDisk;
    1006         412 :         state->maxSpaceStatus = state->status;
    1007             :     }
    1008        3156 : }
    1009             : 
    1010             : /*
    1011             :  * tuplesort_reset
    1012             :  *
    1013             :  *  Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
    1014             :  *  meta-information in.  After tuplesort_reset, tuplesort is ready to start
    1015             :  *  a new sort.  This allows avoiding recreation of tuple sort states (and
    1016             :  *  save resources) when sorting multiple small batches.
    1017             :  */
    1018             : void
    1019        2766 : tuplesort_reset(Tuplesortstate *state)
    1020             : {
    1021        2766 :     tuplesort_updatemax(state);
    1022        2766 :     tuplesort_free(state);
    1023             : 
    1024             :     /*
    1025             :      * After we've freed up per-batch memory, re-setup all of the state common
    1026             :      * to both the first batch and any subsequent batch.
    1027             :      */
    1028        2766 :     tuplesort_begin_batch(state);
    1029             : 
    1030        2766 :     state->lastReturnedTuple = NULL;
    1031        2766 :     state->slabMemoryBegin = NULL;
    1032        2766 :     state->slabMemoryEnd = NULL;
    1033        2766 :     state->slabFreeHead = NULL;
    1034        2766 : }
    1035             : 
    1036             : /*
    1037             :  * Grow the memtuples[] array, if possible within our memory constraint.  We
    1038             :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
    1039             :  * limit.  Return true if we were able to enlarge the array, false if not.
    1040             :  *
    1041             :  * Normally, at each increment we double the size of the array.  When doing
    1042             :  * that would exceed a limit, we attempt one last, smaller increase (and then
    1043             :  * clear the growmemtuples flag so we don't try any more).  That allows us to
    1044             :  * use memory as fully as permitted; sticking to the pure doubling rule could
    1045             :  * result in almost half going unused.  Because availMem moves around with
    1046             :  * tuple addition/removal, we need some rule to prevent making repeated small
    1047             :  * increases in memtupsize, which would just be useless thrashing.  The
    1048             :  * growmemtuples flag accomplishes that and also prevents useless
    1049             :  * recalculations in this function.
    1050             :  */
    1051             : static bool
    1052        7068 : grow_memtuples(Tuplesortstate *state)
    1053             : {
    1054             :     int         newmemtupsize;
    1055        7068 :     int         memtupsize = state->memtupsize;
    1056        7068 :     int64       memNowUsed = state->allowedMem - state->availMem;
    1057             : 
    1058             :     /* Forget it if we've already maxed out memtuples, per comment above */
    1059        7068 :     if (!state->growmemtuples)
    1060         120 :         return false;
    1061             : 
    1062             :     /* Select new value of memtupsize */
    1063        6948 :     if (memNowUsed <= state->availMem)
    1064             :     {
    1065             :         /*
    1066             :          * We've used no more than half of allowedMem; double our usage,
    1067             :          * clamping at INT_MAX tuples.
    1068             :          */
    1069        6820 :         if (memtupsize < INT_MAX / 2)
    1070        6820 :             newmemtupsize = memtupsize * 2;
    1071             :         else
    1072             :         {
    1073           0 :             newmemtupsize = INT_MAX;
    1074           0 :             state->growmemtuples = false;
    1075             :         }
    1076             :     }
    1077             :     else
    1078             :     {
    1079             :         /*
    1080             :          * This will be the last increment of memtupsize.  Abandon doubling
    1081             :          * strategy and instead increase as much as we safely can.
    1082             :          *
    1083             :          * To stay within allowedMem, we can't increase memtupsize by more
    1084             :          * than availMem / sizeof(SortTuple) elements.  In practice, we want
    1085             :          * to increase it by considerably less, because we need to leave some
    1086             :          * space for the tuples to which the new array slots will refer.  We
    1087             :          * assume the new tuples will be about the same size as the tuples
    1088             :          * we've already seen, and thus we can extrapolate from the space
    1089             :          * consumption so far to estimate an appropriate new size for the
    1090             :          * memtuples array.  The optimal value might be higher or lower than
    1091             :          * this estimate, but it's hard to know that in advance.  We again
    1092             :          * clamp at INT_MAX tuples.
    1093             :          *
    1094             :          * This calculation is safe against enlarging the array so much that
    1095             :          * LACKMEM becomes true, because the memory currently used includes
    1096             :          * the present array; thus, there would be enough allowedMem for the
    1097             :          * new array elements even if no other memory were currently used.
    1098             :          *
    1099             :          * We do the arithmetic in float8, because otherwise the product of
    1100             :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
    1101             :          * result should be insignificant; but even if we computed a
    1102             :          * completely insane result, the checks below will prevent anything
    1103             :          * really bad from happening.
    1104             :          */
    1105             :         double      grow_ratio;
    1106             : 
    1107         128 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
    1108         128 :         if (memtupsize * grow_ratio < INT_MAX)
    1109         128 :             newmemtupsize = (int) (memtupsize * grow_ratio);
    1110             :         else
    1111           0 :             newmemtupsize = INT_MAX;
    1112             : 
    1113             :         /* We won't make any further enlargement attempts */
    1114         128 :         state->growmemtuples = false;
    1115             :     }
    1116             : 
    1117             :     /* Must enlarge array by at least one element, else report failure */
    1118        6948 :     if (newmemtupsize <= memtupsize)
    1119           0 :         goto noalloc;
    1120             : 
    1121             :     /*
    1122             :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
    1123             :      * to ensure our request won't be rejected.  Note that we can easily
    1124             :      * exhaust address space before facing this outcome.  (This is presently
    1125             :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
    1126             :      * don't rely on that at this distance.)
    1127             :      */
    1128        6948 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    1129             :     {
    1130           0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
    1131           0 :         state->growmemtuples = false;    /* can't grow any more */
    1132             :     }
    1133             : 
    1134             :     /*
    1135             :      * We need to be sure that we do not cause LACKMEM to become true, else
    1136             :      * the space management algorithm will go nuts.  The code above should
    1137             :      * never generate a dangerous request, but to be safe, check explicitly
    1138             :      * that the array growth fits within availMem.  (We could still cause
    1139             :      * LACKMEM if the memory chunk overhead associated with the memtuples
    1140             :      * array were to increase.  That shouldn't happen because we chose the
    1141             :      * initial array size large enough to ensure that palloc will be treating
    1142             :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
    1143             :      * explicitly below just in case.)
    1144             :      */
    1145        6948 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
    1146           0 :         goto noalloc;
    1147             : 
    1148             :     /* OK, do it */
    1149        6948 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1150        6948 :     state->memtupsize = newmemtupsize;
    1151        6948 :     state->memtuples = (SortTuple *)
    1152        6948 :         repalloc_huge(state->memtuples,
    1153        6948 :                       state->memtupsize * sizeof(SortTuple));
    1154        6948 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1155        6948 :     if (LACKMEM(state))
    1156           0 :         elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    1157        6948 :     return true;
    1158             : 
    1159           0 : noalloc:
    1160             :     /* If for any reason we didn't realloc, shut off future attempts */
    1161           0 :     state->growmemtuples = false;
    1162           0 :     return false;
    1163             : }
    1164             : 
    1165             : /*
    1166             :  * Shared code for tuple and datum cases.
    1167             :  */
    1168             : void
    1169    28288244 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
    1170             :                           bool useAbbrev, Size tuplen)
    1171             : {
    1172    28288244 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1173             : 
    1174             :     Assert(!LEADER(state));
    1175             : 
    1176             :     /* account for the memory used for this tuple */
    1177    28288244 :     USEMEM(state, tuplen);
    1178    28288244 :     state->tupleMem += tuplen;
    1179             : 
    1180    28288244 :     if (!useAbbrev)
    1181             :     {
    1182             :         /*
    1183             :          * Leave ordinary Datum representation, or NULL value.  If there is a
    1184             :          * converter it won't expect NULL values, and cost model is not
    1185             :          * required to account for NULL, so in that case we avoid calling
    1186             :          * converter and just set datum1 to zeroed representation (to be
    1187             :          * consistent, and to support cheap inequality tests for NULL
    1188             :          * abbreviated keys).
    1189             :          */
    1190             :     }
    1191     4425368 :     else if (!consider_abort_common(state))
    1192             :     {
    1193             :         /* Store abbreviated key representation */
    1194     4425272 :         tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
    1195             :                                                                state->base.sortKeys);
    1196             :     }
    1197             :     else
    1198             :     {
    1199             :         /*
    1200             :          * Set state to be consistent with never trying abbreviation.
    1201             :          *
    1202             :          * Alter datum1 representation in already-copied tuples, so as to
    1203             :          * ensure a consistent representation (current tuple was just
    1204             :          * handled).  It does not matter if some dumped tuples are already
    1205             :          * sorted on tape, since serialized tuples lack abbreviated keys
    1206             :          * (TSS_BUILDRUNS state prevents control reaching here in any case).
    1207             :          */
    1208          96 :         REMOVEABBREV(state, state->memtuples, state->memtupcount);
    1209             :     }
    1210             : 
    1211    28288244 :     switch (state->status)
    1212             :     {
    1213    23473644 :         case TSS_INITIAL:
    1214             : 
    1215             :             /*
    1216             :              * Save the tuple into the unsorted array.  First, grow the array
    1217             :              * as needed.  Note that we try to grow the array when there is
    1218             :              * still one free slot remaining --- if we fail, there'll still be
    1219             :              * room to store the incoming tuple, and then we'll switch to
    1220             :              * tape-based operation.
    1221             :              */
    1222    23473644 :             if (state->memtupcount >= state->memtupsize - 1)
    1223             :             {
    1224        7068 :                 (void) grow_memtuples(state);
    1225             :                 Assert(state->memtupcount < state->memtupsize);
    1226             :             }
    1227    23473644 :             state->memtuples[state->memtupcount++] = *tuple;
    1228             : 
    1229             :             /*
    1230             :              * Check if it's time to switch over to a bounded heapsort. We do
    1231             :              * so if the input tuple count exceeds twice the desired tuple
    1232             :              * count (this is a heuristic for where heapsort becomes cheaper
    1233             :              * than a quicksort), or if we've just filled workMem and have
    1234             :              * enough tuples to meet the bound.
    1235             :              *
    1236             :              * Note that once we enter TSS_BOUNDED state we will always try to
    1237             :              * complete the sort that way.  In the worst case, if later input
    1238             :              * tuples are larger than earlier ones, this might cause us to
    1239             :              * exceed workMem significantly.
    1240             :              */
    1241    23473644 :             if (state->bounded &&
    1242       45214 :                 (state->memtupcount > state->bound * 2 ||
    1243       44814 :                  (state->memtupcount > state->bound && LACKMEM(state))))
    1244             :             {
    1245         400 :                 if (trace_sort)
    1246           0 :                     elog(LOG, "switching to bounded heapsort at %d tuples: %s",
    1247             :                          state->memtupcount,
    1248             :                          pg_rusage_show(&state->ru_start));
    1249         400 :                 make_bounded_heap(state);
    1250         400 :                 MemoryContextSwitchTo(oldcontext);
    1251         400 :                 return;
    1252             :             }
    1253             : 
    1254             :             /*
    1255             :              * Done if we still fit in available memory and have array slots.
    1256             :              */
    1257    23473244 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
    1258             :             {
    1259    23473124 :                 MemoryContextSwitchTo(oldcontext);
    1260    23473124 :                 return;
    1261             :             }
    1262             : 
    1263             :             /*
    1264             :              * Nope; time to switch to tape-based operation.
    1265             :              */
    1266         120 :             inittapes(state, true);
    1267             : 
    1268             :             /*
    1269             :              * Dump all tuples.
    1270             :              */
    1271         120 :             dumptuples(state, false);
    1272         120 :             break;
    1273             : 
    1274     3750020 :         case TSS_BOUNDED:
    1275             : 
    1276             :             /*
    1277             :              * We don't want to grow the array here, so check whether the new
    1278             :              * tuple can be discarded before putting it in.  This should be a
    1279             :              * good speed optimization, too, since when there are many more
    1280             :              * input tuples than the bound, most input tuples can be discarded
    1281             :              * with just this one comparison.  Note that because we currently
    1282             :              * have the sort direction reversed, we must check for <= not >=.
    1283             :              */
    1284     3750020 :             if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
    1285             :             {
    1286             :                 /* new tuple <= top of the heap, so we can discard it */
    1287     3247044 :                 free_sort_tuple(state, tuple);
    1288     3247044 :                 CHECK_FOR_INTERRUPTS();
    1289             :             }
    1290             :             else
    1291             :             {
    1292             :                 /* discard top of heap, replacing it with the new tuple */
    1293      502976 :                 free_sort_tuple(state, &state->memtuples[0]);
    1294      502976 :                 tuplesort_heap_replace_top(state, tuple);
    1295             :             }
    1296     3750020 :             break;
    1297             : 
    1298     1064580 :         case TSS_BUILDRUNS:
    1299             : 
    1300             :             /*
    1301             :              * Save the tuple into the unsorted array (there must be space)
    1302             :              */
    1303     1064580 :             state->memtuples[state->memtupcount++] = *tuple;
    1304             : 
    1305             :             /*
    1306             :              * If we are over the memory limit, dump all tuples.
    1307             :              */
    1308     1064580 :             dumptuples(state, false);
    1309     1064580 :             break;
    1310             : 
    1311           0 :         default:
    1312           0 :             elog(ERROR, "invalid tuplesort state");
    1313             :             break;
    1314             :     }
    1315     4814720 :     MemoryContextSwitchTo(oldcontext);
    1316             : }
    1317             : 
    1318             : static bool
    1319     4425368 : consider_abort_common(Tuplesortstate *state)
    1320             : {
    1321             :     Assert(state->base.sortKeys[0].abbrev_converter != NULL);
    1322             :     Assert(state->base.sortKeys[0].abbrev_abort != NULL);
    1323             :     Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
    1324             : 
    1325             :     /*
    1326             :      * Check effectiveness of abbreviation optimization.  Consider aborting
    1327             :      * when still within memory limit.
    1328             :      */
    1329     4425368 :     if (state->status == TSS_INITIAL &&
    1330     3952938 :         state->memtupcount >= state->abbrevNext)
    1331             :     {
    1332        4948 :         state->abbrevNext *= 2;
    1333             : 
    1334             :         /*
    1335             :          * Check opclass-supplied abbreviation abort routine.  It may indicate
    1336             :          * that abbreviation should not proceed.
    1337             :          */
    1338        4948 :         if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
    1339             :                                                 state->base.sortKeys))
    1340        4852 :             return false;
    1341             : 
    1342             :         /*
    1343             :          * Finally, restore authoritative comparator, and indicate that
    1344             :          * abbreviation is not in play by setting abbrev_converter to NULL
    1345             :          */
    1346          96 :         state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
    1347          96 :         state->base.sortKeys[0].abbrev_converter = NULL;
    1348             :         /* Not strictly necessary, but be tidy */
    1349          96 :         state->base.sortKeys[0].abbrev_abort = NULL;
    1350          96 :         state->base.sortKeys[0].abbrev_full_comparator = NULL;
    1351             : 
    1352             :         /* Give up - expect original pass-by-value representation */
    1353          96 :         return true;
    1354             :     }
    1355             : 
    1356     4420420 :     return false;
    1357             : }
    1358             : 
    1359             : /*
    1360             :  * All tuples have been provided; finish the sort.
    1361             :  */
    1362             : void
    1363      214096 : tuplesort_performsort(Tuplesortstate *state)
    1364             : {
    1365      214096 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1366             : 
    1367      214096 :     if (trace_sort)
    1368           0 :         elog(LOG, "performsort of worker %d starting: %s",
    1369             :              state->worker, pg_rusage_show(&state->ru_start));
    1370             : 
    1371      214096 :     switch (state->status)
    1372             :     {
    1373      213576 :         case TSS_INITIAL:
    1374             : 
    1375             :             /*
    1376             :              * We were able to accumulate all the tuples within the allowed
    1377             :              * amount of memory, or leader to take over worker tapes
    1378             :              */
    1379      213576 :             if (SERIAL(state))
    1380             :             {
    1381             :                 /* Just qsort 'em and we're done */
    1382      212976 :                 tuplesort_sort_memtuples(state);
    1383      212892 :                 state->status = TSS_SORTEDINMEM;
    1384             :             }
    1385         600 :             else if (WORKER(state))
    1386             :             {
    1387             :                 /*
    1388             :                  * Parallel workers must still dump out tuples to tape.  No
    1389             :                  * merge is required to produce single output run, though.
    1390             :                  */
    1391         444 :                 inittapes(state, false);
    1392         444 :                 dumptuples(state, true);
    1393         444 :                 worker_nomergeruns(state);
    1394         444 :                 state->status = TSS_SORTEDONTAPE;
    1395             :             }
    1396             :             else
    1397             :             {
    1398             :                 /*
    1399             :                  * Leader will take over worker tapes and merge worker runs.
    1400             :                  * Note that mergeruns sets the correct state->status.
    1401             :                  */
    1402         156 :                 leader_takeover_tapes(state);
    1403         156 :                 mergeruns(state);
    1404             :             }
    1405      213492 :             state->current = 0;
    1406      213492 :             state->eof_reached = false;
    1407      213492 :             state->markpos_block = 0L;
    1408      213492 :             state->markpos_offset = 0;
    1409      213492 :             state->markpos_eof = false;
    1410      213492 :             break;
    1411             : 
    1412         400 :         case TSS_BOUNDED:
    1413             : 
    1414             :             /*
    1415             :              * We were able to accumulate all the tuples required for output
    1416             :              * in memory, using a heap to eliminate excess tuples.  Now we
    1417             :              * have to transform the heap to a properly-sorted array. Note
    1418             :              * that sort_bounded_heap sets the correct state->status.
    1419             :              */
    1420         400 :             sort_bounded_heap(state);
    1421         400 :             state->current = 0;
    1422         400 :             state->eof_reached = false;
    1423         400 :             state->markpos_offset = 0;
    1424         400 :             state->markpos_eof = false;
    1425         400 :             break;
    1426             : 
    1427         120 :         case TSS_BUILDRUNS:
    1428             : 
    1429             :             /*
    1430             :              * Finish tape-based sort.  First, flush all tuples remaining in
    1431             :              * memory out to tape; then merge until we have a single remaining
    1432             :              * run (or, if !randomAccess and !WORKER(), one run per tape).
    1433             :              * Note that mergeruns sets the correct state->status.
    1434             :              */
    1435         120 :             dumptuples(state, true);
    1436         120 :             mergeruns(state);
    1437         120 :             state->eof_reached = false;
    1438         120 :             state->markpos_block = 0L;
    1439         120 :             state->markpos_offset = 0;
    1440         120 :             state->markpos_eof = false;
    1441         120 :             break;
    1442             : 
    1443           0 :         default:
    1444           0 :             elog(ERROR, "invalid tuplesort state");
    1445             :             break;
    1446             :     }
    1447             : 
    1448      214012 :     if (trace_sort)
    1449             :     {
    1450           0 :         if (state->status == TSS_FINALMERGE)
    1451           0 :             elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
    1452             :                  state->worker, state->nInputTapes,
    1453             :                  pg_rusage_show(&state->ru_start));
    1454             :         else
    1455           0 :             elog(LOG, "performsort of worker %d done: %s",
    1456             :                  state->worker, pg_rusage_show(&state->ru_start));
    1457             :     }
    1458             : 
    1459      214012 :     MemoryContextSwitchTo(oldcontext);
    1460      214012 : }
    1461             : 
    1462             : /*
    1463             :  * Internal routine to fetch the next tuple in either forward or back
    1464             :  * direction into *stup.  Returns false if no more tuples.
    1465             :  * Returned tuple belongs to tuplesort memory context, and must not be freed
    1466             :  * by caller.  Note that fetched tuple is stored in memory that may be
    1467             :  * recycled by any future fetch.
    1468             :  */
    1469             : bool
    1470    25450436 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    1471             :                           SortTuple *stup)
    1472             : {
    1473             :     unsigned int tuplen;
    1474             :     size_t      nmoved;
    1475             : 
    1476             :     Assert(!WORKER(state));
    1477             : 
    1478    25450436 :     switch (state->status)
    1479             :     {
    1480    20609862 :         case TSS_SORTEDINMEM:
    1481             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1482             :             Assert(!state->slabAllocatorUsed);
    1483    20609862 :             if (forward)
    1484             :             {
    1485    20609796 :                 if (state->current < state->memtupcount)
    1486             :                 {
    1487    20398242 :                     *stup = state->memtuples[state->current++];
    1488    20398242 :                     return true;
    1489             :                 }
    1490      211554 :                 state->eof_reached = true;
    1491             : 
    1492             :                 /*
    1493             :                  * Complain if caller tries to retrieve more tuples than
    1494             :                  * originally asked for in a bounded sort.  This is because
    1495             :                  * returning EOF here might be the wrong thing.
    1496             :                  */
    1497      211554 :                 if (state->bounded && state->current >= state->bound)
    1498           0 :                     elog(ERROR, "retrieved too many tuples in a bounded sort");
    1499             : 
    1500      211554 :                 return false;
    1501             :             }
    1502             :             else
    1503             :             {
    1504          66 :                 if (state->current <= 0)
    1505           0 :                     return false;
    1506             : 
    1507             :                 /*
    1508             :                  * if all tuples are fetched already then we return last
    1509             :                  * tuple, else - tuple before last returned.
    1510             :                  */
    1511          66 :                 if (state->eof_reached)
    1512          12 :                     state->eof_reached = false;
    1513             :                 else
    1514             :                 {
    1515          54 :                     state->current--;    /* last returned tuple */
    1516          54 :                     if (state->current <= 0)
    1517           6 :                         return false;
    1518             :                 }
    1519          60 :                 *stup = state->memtuples[state->current - 1];
    1520          60 :                 return true;
    1521             :             }
    1522             :             break;
    1523             : 
    1524      272994 :         case TSS_SORTEDONTAPE:
    1525             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1526             :             Assert(state->slabAllocatorUsed);
    1527             : 
    1528             :             /*
    1529             :              * The slot that held the tuple that we returned in previous
    1530             :              * gettuple call can now be reused.
    1531             :              */
    1532      272994 :             if (state->lastReturnedTuple)
    1533             :             {
    1534      152850 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1535      152850 :                 state->lastReturnedTuple = NULL;
    1536             :             }
    1537             : 
    1538      272994 :             if (forward)
    1539             :             {
    1540      272964 :                 if (state->eof_reached)
    1541           0 :                     return false;
    1542             : 
    1543      272964 :                 if ((tuplen = getlen(state->result_tape, true)) != 0)
    1544             :                 {
    1545      272940 :                     READTUP(state, stup, state->result_tape, tuplen);
    1546             : 
    1547             :                     /*
    1548             :                      * Remember the tuple we return, so that we can recycle
    1549             :                      * its memory on next call.  (This can be NULL, in the
    1550             :                      * !state->tuples case).
    1551             :                      */
    1552      272940 :                     state->lastReturnedTuple = stup->tuple;
    1553             : 
    1554      272940 :                     return true;
    1555             :                 }
    1556             :                 else
    1557             :                 {
    1558          24 :                     state->eof_reached = true;
    1559          24 :                     return false;
    1560             :                 }
    1561             :             }
    1562             : 
    1563             :             /*
    1564             :              * Backward.
    1565             :              *
    1566             :              * if all tuples are fetched already then we return last tuple,
    1567             :              * else - tuple before last returned.
    1568             :              */
    1569          30 :             if (state->eof_reached)
    1570             :             {
    1571             :                 /*
    1572             :                  * Seek position is pointing just past the zero tuplen at the
    1573             :                  * end of file; back up to fetch last tuple's ending length
    1574             :                  * word.  If seek fails we must have a completely empty file.
    1575             :                  */
    1576          12 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1577             :                                               2 * sizeof(unsigned int));
    1578          12 :                 if (nmoved == 0)
    1579           0 :                     return false;
    1580          12 :                 else if (nmoved != 2 * sizeof(unsigned int))
    1581           0 :                     elog(ERROR, "unexpected tape position");
    1582          12 :                 state->eof_reached = false;
    1583             :             }
    1584             :             else
    1585             :             {
    1586             :                 /*
    1587             :                  * Back up and fetch previously-returned tuple's ending length
    1588             :                  * word.  If seek fails, assume we are at start of file.
    1589             :                  */
    1590          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1591             :                                               sizeof(unsigned int));
    1592          18 :                 if (nmoved == 0)
    1593           0 :                     return false;
    1594          18 :                 else if (nmoved != sizeof(unsigned int))
    1595           0 :                     elog(ERROR, "unexpected tape position");
    1596          18 :                 tuplen = getlen(state->result_tape, false);
    1597             : 
    1598             :                 /*
    1599             :                  * Back up to get ending length word of tuple before it.
    1600             :                  */
    1601          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1602             :                                               tuplen + 2 * sizeof(unsigned int));
    1603          18 :                 if (nmoved == tuplen + sizeof(unsigned int))
    1604             :                 {
    1605             :                     /*
    1606             :                      * We backed up over the previous tuple, but there was no
    1607             :                      * ending length word before it.  That means that the prev
    1608             :                      * tuple is the first tuple in the file.  It is now the
    1609             :                      * next to read in forward direction (not obviously right,
    1610             :                      * but that is what in-memory case does).
    1611             :                      */
    1612           6 :                     return false;
    1613             :                 }
    1614          12 :                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
    1615           0 :                     elog(ERROR, "bogus tuple length in backward scan");
    1616             :             }
    1617             : 
    1618          24 :             tuplen = getlen(state->result_tape, false);
    1619             : 
    1620             :             /*
    1621             :              * Now we have the length of the prior tuple, back up and read it.
    1622             :              * Note: READTUP expects we are positioned after the initial
    1623             :              * length word of the tuple, so back up to that point.
    1624             :              */
    1625          24 :             nmoved = LogicalTapeBackspace(state->result_tape,
    1626             :                                           tuplen);
    1627          24 :             if (nmoved != tuplen)
    1628           0 :                 elog(ERROR, "bogus tuple length in backward scan");
    1629          24 :             READTUP(state, stup, state->result_tape, tuplen);
    1630             : 
    1631             :             /*
    1632             :              * Remember the tuple we return, so that we can recycle its memory
    1633             :              * on next call. (This can be NULL, in the Datum case).
    1634             :              */
    1635          24 :             state->lastReturnedTuple = stup->tuple;
    1636             : 
    1637          24 :             return true;
    1638             : 
    1639     4567580 :         case TSS_FINALMERGE:
    1640             :             Assert(forward);
    1641             :             /* We are managing memory ourselves, with the slab allocator. */
    1642             :             Assert(state->slabAllocatorUsed);
    1643             : 
    1644             :             /*
    1645             :              * The slab slot holding the tuple that we returned in previous
    1646             :              * gettuple call can now be reused.
    1647             :              */
    1648     4567580 :             if (state->lastReturnedTuple)
    1649             :             {
    1650     4507292 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1651     4507292 :                 state->lastReturnedTuple = NULL;
    1652             :             }
    1653             : 
    1654             :             /*
    1655             :              * This code should match the inner loop of mergeonerun().
    1656             :              */
    1657     4567580 :             if (state->memtupcount > 0)
    1658             :             {
    1659     4567340 :                 int         srcTapeIndex = state->memtuples[0].srctape;
    1660     4567340 :                 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
    1661             :                 SortTuple   newtup;
    1662             : 
    1663     4567340 :                 *stup = state->memtuples[0];
    1664             : 
    1665             :                 /*
    1666             :                  * Remember the tuple we return, so that we can recycle its
    1667             :                  * memory on next call. (This can be NULL, in the Datum case).
    1668             :                  */
    1669     4567340 :                 state->lastReturnedTuple = stup->tuple;
    1670             : 
    1671             :                 /*
    1672             :                  * Pull next tuple from tape, and replace the returned tuple
    1673             :                  * at top of the heap with it.
    1674             :                  */
    1675     4567340 :                 if (!mergereadnext(state, srcTape, &newtup))
    1676             :                 {
    1677             :                     /*
    1678             :                      * If no more data, we've reached end of run on this tape.
    1679             :                      * Remove the top node from the heap.
    1680             :                      */
    1681         340 :                     tuplesort_heap_delete_top(state);
    1682         340 :                     state->nInputRuns--;
    1683             : 
    1684             :                     /*
    1685             :                      * Close the tape.  It'd go away at the end of the sort
    1686             :                      * anyway, but better to release the memory early.
    1687             :                      */
    1688         340 :                     LogicalTapeClose(srcTape);
    1689         340 :                     return true;
    1690             :                 }
    1691     4567000 :                 newtup.srctape = srcTapeIndex;
    1692     4567000 :                 tuplesort_heap_replace_top(state, &newtup);
    1693     4567000 :                 return true;
    1694             :             }
    1695         240 :             return false;
    1696             : 
    1697           0 :         default:
    1698           0 :             elog(ERROR, "invalid tuplesort state");
    1699             :             return false;       /* keep compiler quiet */
    1700             :     }
    1701             : }
    1702             : 
    1703             : 
    1704             : /*
    1705             :  * Advance over N tuples in either forward or back direction,
    1706             :  * without returning any data.  N==0 is a no-op.
    1707             :  * Returns true if successful, false if ran out of tuples.
    1708             :  */
    1709             : bool
    1710         392 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
    1711             : {
    1712             :     MemoryContext oldcontext;
    1713             : 
    1714             :     /*
    1715             :      * We don't actually support backwards skip yet, because no callers need
    1716             :      * it.  The API is designed to allow for that later, though.
    1717             :      */
    1718             :     Assert(forward);
    1719             :     Assert(ntuples >= 0);
    1720             :     Assert(!WORKER(state));
    1721             : 
    1722         392 :     switch (state->status)
    1723             :     {
    1724         368 :         case TSS_SORTEDINMEM:
    1725         368 :             if (state->memtupcount - state->current >= ntuples)
    1726             :             {
    1727         368 :                 state->current += ntuples;
    1728         368 :                 return true;
    1729             :             }
    1730           0 :             state->current = state->memtupcount;
    1731           0 :             state->eof_reached = true;
    1732             : 
    1733             :             /*
    1734             :              * Complain if caller tries to retrieve more tuples than
    1735             :              * originally asked for in a bounded sort.  This is because
    1736             :              * returning EOF here might be the wrong thing.
    1737             :              */
    1738           0 :             if (state->bounded && state->current >= state->bound)
    1739           0 :                 elog(ERROR, "retrieved too many tuples in a bounded sort");
    1740             : 
    1741           0 :             return false;
    1742             : 
    1743          24 :         case TSS_SORTEDONTAPE:
    1744             :         case TSS_FINALMERGE:
    1745             : 
    1746             :             /*
    1747             :              * We could probably optimize these cases better, but for now it's
    1748             :              * not worth the trouble.
    1749             :              */
    1750          24 :             oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1751      240132 :             while (ntuples-- > 0)
    1752             :             {
    1753             :                 SortTuple   stup;
    1754             : 
    1755      240108 :                 if (!tuplesort_gettuple_common(state, forward, &stup))
    1756             :                 {
    1757           0 :                     MemoryContextSwitchTo(oldcontext);
    1758           0 :                     return false;
    1759             :                 }
    1760      240108 :                 CHECK_FOR_INTERRUPTS();
    1761             :             }
    1762          24 :             MemoryContextSwitchTo(oldcontext);
    1763          24 :             return true;
    1764             : 
    1765           0 :         default:
    1766           0 :             elog(ERROR, "invalid tuplesort state");
    1767             :             return false;       /* keep compiler quiet */
    1768             :     }
    1769             : }
    1770             : 
    1771             : /*
    1772             :  * tuplesort_merge_order - report merge order we'll use for given memory
    1773             :  * (note: "merge order" just means the number of input tapes in the merge).
    1774             :  *
    1775             :  * This is exported for use by the planner.  allowedMem is in bytes.
    1776             :  */
    1777             : int
    1778       17040 : tuplesort_merge_order(int64 allowedMem)
    1779             : {
    1780             :     int         mOrder;
    1781             : 
    1782             :     /*----------
    1783             :      * In the merge phase, we need buffer space for each input and output tape.
    1784             :      * Each pass in the balanced merge algorithm reads from M input tapes, and
    1785             :      * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
    1786             :      * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
    1787             :      * input tape.
    1788             :      *
    1789             :      * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
    1790             :      *            N * TAPE_BUFFER_OVERHEAD
    1791             :      *
    1792             :      * Except for the last and next-to-last merge passes, where there can be
    1793             :      * fewer tapes left to process, M = N.  We choose M so that we have the
    1794             :      * desired amount of memory available for the input buffers
    1795             :      * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
    1796             :      * available for the tape buffers (allowedMem).
    1797             :      *
    1798             :      * Note: you might be thinking we need to account for the memtuples[]
    1799             :      * array in this calculation, but we effectively treat that as part of the
    1800             :      * MERGE_BUFFER_SIZE workspace.
    1801             :      *----------
    1802             :      */
    1803       17040 :     mOrder = allowedMem /
    1804             :         (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
    1805             : 
    1806             :     /*
    1807             :      * Even in minimum memory, use at least a MINORDER merge.  On the other
    1808             :      * hand, even when we have lots of memory, do not use more than a MAXORDER
    1809             :      * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
    1810             :      * additional tape reduces the amount of memory available to build runs,
    1811             :      * which in turn can cause the same sort to need more runs, which makes
    1812             :      * merging slower even if it can still be done in a single pass.  Also,
    1813             :      * high order merges are quite slow due to CPU cache effects; it can be
    1814             :      * faster to pay the I/O cost of a multi-pass merge than to perform a
    1815             :      * single merge pass across many hundreds of tapes.
    1816             :      */
    1817       17040 :     mOrder = Max(mOrder, MINORDER);
    1818       17040 :     mOrder = Min(mOrder, MAXORDER);
    1819             : 
    1820       17040 :     return mOrder;
    1821             : }
    1822             : 
    1823             : /*
    1824             :  * Helper function to calculate how much memory to allocate for the read buffer
    1825             :  * of each input tape in a merge pass.
    1826             :  *
    1827             :  * 'avail_mem' is the amount of memory available for the buffers of all the
    1828             :  *      tapes, both input and output.
    1829             :  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
    1830             :  * 'maxOutputTapes' is the max. number of output tapes we should produce.
    1831             :  */
    1832             : static int64
    1833         306 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
    1834             :                        int maxOutputTapes)
    1835             : {
    1836             :     int         nOutputRuns;
    1837             :     int         nOutputTapes;
    1838             : 
    1839             :     /*
    1840             :      * How many output tapes will we produce in this pass?
    1841             :      *
    1842             :      * This is nInputRuns / nInputTapes, rounded up.
    1843             :      */
    1844         306 :     nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
    1845             : 
    1846         306 :     nOutputTapes = Min(nOutputRuns, maxOutputTapes);
    1847             : 
    1848             :     /*
    1849             :      * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
    1850             :      * remaining memory is divided evenly between the input tapes.
    1851             :      *
    1852             :      * This also follows from the formula in tuplesort_merge_order, but here
    1853             :      * we derive the input buffer size from the amount of memory available,
    1854             :      * and M and N.
    1855             :      */
    1856         306 :     return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
    1857             : }
    1858             : 
    1859             : /*
    1860             :  * inittapes - initialize for tape sorting.
    1861             :  *
    1862             :  * This is called only if we have found we won't sort in memory.
    1863             :  */
    1864             : static void
    1865         564 : inittapes(Tuplesortstate *state, bool mergeruns)
    1866             : {
    1867             :     Assert(!LEADER(state));
    1868             : 
    1869         564 :     if (mergeruns)
    1870             :     {
    1871             :         /* Compute number of input tapes to use when merging */
    1872         120 :         state->maxTapes = tuplesort_merge_order(state->allowedMem);
    1873             :     }
    1874             :     else
    1875             :     {
    1876             :         /* Workers can sometimes produce single run, output without merge */
    1877             :         Assert(WORKER(state));
    1878         444 :         state->maxTapes = MINORDER;
    1879             :     }
    1880             : 
    1881         564 :     if (trace_sort)
    1882           0 :         elog(LOG, "worker %d switching to external sort with %d tapes: %s",
    1883             :              state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
    1884             : 
    1885             :     /* Create the tape set */
    1886         564 :     inittapestate(state, state->maxTapes);
    1887         564 :     state->tapeset =
    1888         564 :         LogicalTapeSetCreate(false,
    1889         564 :                              state->shared ? &state->shared->fileset : NULL,
    1890             :                              state->worker);
    1891             : 
    1892         564 :     state->currentRun = 0;
    1893             : 
    1894             :     /*
    1895             :      * Initialize logical tape arrays.
    1896             :      */
    1897         564 :     state->inputTapes = NULL;
    1898         564 :     state->nInputTapes = 0;
    1899         564 :     state->nInputRuns = 0;
    1900             : 
    1901         564 :     state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
    1902         564 :     state->nOutputTapes = 0;
    1903         564 :     state->nOutputRuns = 0;
    1904             : 
    1905         564 :     state->status = TSS_BUILDRUNS;
    1906             : 
    1907         564 :     selectnewtape(state);
    1908         564 : }
    1909             : 
    1910             : /*
    1911             :  * inittapestate - initialize generic tape management state
    1912             :  */
    1913             : static void
    1914         720 : inittapestate(Tuplesortstate *state, int maxTapes)
    1915             : {
    1916             :     int64       tapeSpace;
    1917             : 
    1918             :     /*
    1919             :      * Decrease availMem to reflect the space needed for tape buffers; but
    1920             :      * don't decrease it to the point that we have no room for tuples. (That
    1921             :      * case is only likely to occur if sorting pass-by-value Datums; in all
    1922             :      * other scenarios the memtuples[] array is unlikely to occupy more than
    1923             :      * half of allowedMem.  In the pass-by-value case it's not important to
    1924             :      * account for tuple space, so we don't care if LACKMEM becomes
    1925             :      * inaccurate.)
    1926             :      */
    1927         720 :     tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
    1928             : 
    1929         720 :     if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
    1930         618 :         USEMEM(state, tapeSpace);
    1931             : 
    1932             :     /*
    1933             :      * Make sure that the temp file(s) underlying the tape set are created in
    1934             :      * suitable temp tablespaces.  For parallel sorts, this should have been
    1935             :      * called already, but it doesn't matter if it is called a second time.
    1936             :      */
    1937         720 :     PrepareTempTablespaces();
    1938         720 : }
    1939             : 
    1940             : /*
    1941             :  * selectnewtape -- select next tape to output to.
    1942             :  *
    1943             :  * This is called after finishing a run when we know another run
    1944             :  * must be started.  This is used both when building the initial
    1945             :  * runs, and during merge passes.
    1946             :  */
    1947             : static void
    1948        1638 : selectnewtape(Tuplesortstate *state)
    1949             : {
    1950             :     /*
    1951             :      * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
    1952             :      * both zero.  On each call, we create a new output tape to hold the next
    1953             :      * run, until maxTapes is reached.  After that, we assign new runs to the
    1954             :      * existing tapes in a round robin fashion.
    1955             :      */
    1956        1638 :     if (state->nOutputTapes < state->maxTapes)
    1957             :     {
    1958             :         /* Create a new tape to hold the next run */
    1959             :         Assert(state->outputTapes[state->nOutputRuns] == NULL);
    1960             :         Assert(state->nOutputRuns == state->nOutputTapes);
    1961        1056 :         state->destTape = LogicalTapeCreate(state->tapeset);
    1962        1056 :         state->outputTapes[state->nOutputTapes] = state->destTape;
    1963        1056 :         state->nOutputTapes++;
    1964        1056 :         state->nOutputRuns++;
    1965             :     }
    1966             :     else
    1967             :     {
    1968             :         /*
    1969             :          * We have reached the max number of tapes.  Append to an existing
    1970             :          * tape.
    1971             :          */
    1972         582 :         state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
    1973         582 :         state->nOutputRuns++;
    1974             :     }
    1975        1638 : }
    1976             : 
    1977             : /*
    1978             :  * Initialize the slab allocation arena, for the given number of slots.
    1979             :  */
    1980             : static void
    1981         276 : init_slab_allocator(Tuplesortstate *state, int numSlots)
    1982             : {
    1983         276 :     if (numSlots > 0)
    1984             :     {
    1985             :         char       *p;
    1986             :         int         i;
    1987             : 
    1988         264 :         state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
    1989         264 :         state->slabMemoryEnd = state->slabMemoryBegin +
    1990         264 :             numSlots * SLAB_SLOT_SIZE;
    1991         264 :         state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
    1992         264 :         USEMEM(state, numSlots * SLAB_SLOT_SIZE);
    1993             : 
    1994         264 :         p = state->slabMemoryBegin;
    1995        1006 :         for (i = 0; i < numSlots - 1; i++)
    1996             :         {
    1997         742 :             ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
    1998         742 :             p += SLAB_SLOT_SIZE;
    1999             :         }
    2000         264 :         ((SlabSlot *) p)->nextfree = NULL;
    2001             :     }
    2002             :     else
    2003             :     {
    2004          12 :         state->slabMemoryBegin = state->slabMemoryEnd = NULL;
    2005          12 :         state->slabFreeHead = NULL;
    2006             :     }
    2007         276 :     state->slabAllocatorUsed = true;
    2008         276 : }
    2009             : 
    2010             : /*
    2011             :  * mergeruns -- merge all the completed initial runs.
    2012             :  *
    2013             :  * This implements the Balanced k-Way Merge Algorithm.  All input data has
    2014             :  * already been written to initial runs on tape (see dumptuples).
    2015             :  */
    2016             : static void
    2017         276 : mergeruns(Tuplesortstate *state)
    2018             : {
    2019             :     int         tapenum;
    2020             : 
    2021             :     Assert(state->status == TSS_BUILDRUNS);
    2022             :     Assert(state->memtupcount == 0);
    2023             : 
    2024         276 :     if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    2025             :     {
    2026             :         /*
    2027             :          * If there are multiple runs to be merged, when we go to read back
    2028             :          * tuples from disk, abbreviated keys will not have been stored, and
    2029             :          * we don't care to regenerate them.  Disable abbreviation from this
    2030             :          * point on.
    2031             :          */
    2032          30 :         state->base.sortKeys->abbrev_converter = NULL;
    2033          30 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
    2034             : 
    2035             :         /* Not strictly necessary, but be tidy */
    2036          30 :         state->base.sortKeys->abbrev_abort = NULL;
    2037          30 :         state->base.sortKeys->abbrev_full_comparator = NULL;
    2038             :     }
    2039             : 
    2040             :     /*
    2041             :      * Reset tuple memory.  We've freed all the tuples that we previously
    2042             :      * allocated.  We will use the slab allocator from now on.
    2043             :      */
    2044         276 :     MemoryContextResetOnly(state->base.tuplecontext);
    2045             : 
    2046             :     /*
    2047             :      * We no longer need a large memtuples array.  (We will allocate a smaller
    2048             :      * one for the heap later.)
    2049             :      */
    2050         276 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2051         276 :     pfree(state->memtuples);
    2052         276 :     state->memtuples = NULL;
    2053             : 
    2054             :     /*
    2055             :      * Initialize the slab allocator.  We need one slab slot per input tape,
    2056             :      * for the tuples in the heap, plus one to hold the tuple last returned
    2057             :      * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
    2058             :      * however, we don't need to do allocate anything.)
    2059             :      *
    2060             :      * In a multi-pass merge, we could shrink this allocation for the last
    2061             :      * merge pass, if it has fewer tapes than previous passes, but we don't
    2062             :      * bother.
    2063             :      *
    2064             :      * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
    2065             :      * to track memory usage of individual tuples.
    2066             :      */
    2067         276 :     if (state->base.tuples)
    2068         264 :         init_slab_allocator(state, state->nOutputTapes + 1);
    2069             :     else
    2070          12 :         init_slab_allocator(state, 0);
    2071             : 
    2072             :     /*
    2073             :      * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
    2074             :      * from each input tape.
    2075             :      *
    2076             :      * We could shrink this, too, between passes in a multi-pass merge, but we
    2077             :      * don't bother.  (The initial input tapes are still in outputTapes.  The
    2078             :      * number of input tapes will not increase between passes.)
    2079             :      */
    2080         276 :     state->memtupsize = state->nOutputTapes;
    2081         552 :     state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
    2082         276 :                                                         state->nOutputTapes * sizeof(SortTuple));
    2083         276 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2084             : 
    2085             :     /*
    2086             :      * Use all the remaining memory we have available for tape buffers among
    2087             :      * all the input tapes.  At the beginning of each merge pass, we will
    2088             :      * divide this memory between the input and output tapes in the pass.
    2089             :      */
    2090         276 :     state->tape_buffer_mem = state->availMem;
    2091         276 :     USEMEM(state, state->tape_buffer_mem);
    2092         276 :     if (trace_sort)
    2093           0 :         elog(LOG, "worker %d using %zu KB of memory for tape buffers",
    2094             :              state->worker, state->tape_buffer_mem / 1024);
    2095             : 
    2096             :     for (;;)
    2097             :     {
    2098             :         /*
    2099             :          * On the first iteration, or if we have read all the runs from the
    2100             :          * input tapes in a multi-pass merge, it's time to start a new pass.
    2101             :          * Rewind all the output tapes, and make them inputs for the next
    2102             :          * pass.
    2103             :          */
    2104         414 :         if (state->nInputRuns == 0)
    2105             :         {
    2106             :             int64       input_buffer_size;
    2107             : 
    2108             :             /* Close the old, emptied, input tapes */
    2109         306 :             if (state->nInputTapes > 0)
    2110             :             {
    2111         210 :                 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2112         180 :                     LogicalTapeClose(state->inputTapes[tapenum]);
    2113          30 :                 pfree(state->inputTapes);
    2114             :             }
    2115             : 
    2116             :             /* Previous pass's outputs become next pass's inputs. */
    2117         306 :             state->inputTapes = state->outputTapes;
    2118         306 :             state->nInputTapes = state->nOutputTapes;
    2119         306 :             state->nInputRuns = state->nOutputRuns;
    2120             : 
    2121             :             /*
    2122             :              * Reset output tape variables.  The actual LogicalTapes will be
    2123             :              * created as needed, here we only allocate the array to hold
    2124             :              * them.
    2125             :              */
    2126         306 :             state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
    2127         306 :             state->nOutputTapes = 0;
    2128         306 :             state->nOutputRuns = 0;
    2129             : 
    2130             :             /*
    2131             :              * Redistribute the memory allocated for tape buffers, among the
    2132             :              * new input and output tapes.
    2133             :              */
    2134         306 :             input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
    2135             :                                                        state->nInputTapes,
    2136             :                                                        state->nInputRuns,
    2137             :                                                        state->maxTapes);
    2138             : 
    2139         306 :             if (trace_sort)
    2140           0 :                 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
    2141             :                      state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
    2142             :                      pg_rusage_show(&state->ru_start));
    2143             : 
    2144             :             /* Prepare the new input tapes for merge pass. */
    2145        1216 :             for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2146         910 :                 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
    2147             : 
    2148             :             /*
    2149             :              * If there's just one run left on each input tape, then only one
    2150             :              * merge pass remains.  If we don't have to produce a materialized
    2151             :              * sorted tape, we can stop at this point and do the final merge
    2152             :              * on-the-fly.
    2153             :              */
    2154         306 :             if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
    2155         288 :                 && state->nInputRuns <= state->nInputTapes
    2156         258 :                 && !WORKER(state))
    2157             :             {
    2158             :                 /* Tell logtape.c we won't be writing anymore */
    2159         258 :                 LogicalTapeSetForgetFreeSpace(state->tapeset);
    2160             :                 /* Initialize for the final merge pass */
    2161         258 :                 beginmerge(state);
    2162         258 :                 state->status = TSS_FINALMERGE;
    2163         258 :                 return;
    2164             :             }
    2165             :         }
    2166             : 
    2167             :         /* Select an output tape */
    2168         156 :         selectnewtape(state);
    2169             : 
    2170             :         /* Merge one run from each input tape. */
    2171         156 :         mergeonerun(state);
    2172             : 
    2173             :         /*
    2174             :          * If the input tapes are empty, and we output only one output run,
    2175             :          * we're done.  The current output tape contains the final result.
    2176             :          */
    2177         156 :         if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
    2178          18 :             break;
    2179             :     }
    2180             : 
    2181             :     /*
    2182             :      * Done.  The result is on a single run on a single tape.
    2183             :      */
    2184          18 :     state->result_tape = state->outputTapes[0];
    2185          18 :     if (!WORKER(state))
    2186          18 :         LogicalTapeFreeze(state->result_tape, NULL);
    2187             :     else
    2188           0 :         worker_freeze_result_tape(state);
    2189          18 :     state->status = TSS_SORTEDONTAPE;
    2190             : 
    2191             :     /* Close all the now-empty input tapes, to release their read buffers. */
    2192         102 :     for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2193          84 :         LogicalTapeClose(state->inputTapes[tapenum]);
    2194             : }
    2195             : 
    2196             : /*
    2197             :  * Merge one run from each input tape.
    2198             :  */
    2199             : static void
    2200         156 : mergeonerun(Tuplesortstate *state)
    2201             : {
    2202             :     int         srcTapeIndex;
    2203             :     LogicalTape *srcTape;
    2204             : 
    2205             :     /*
    2206             :      * Start the merge by loading one tuple from each active source tape into
    2207             :      * the heap.
    2208             :      */
    2209         156 :     beginmerge(state);
    2210             : 
    2211             :     Assert(state->slabAllocatorUsed);
    2212             : 
    2213             :     /*
    2214             :      * Execute merge by repeatedly extracting lowest tuple in heap, writing it
    2215             :      * out, and replacing it with next tuple from same tape (if there is
    2216             :      * another one).
    2217             :      */
    2218      855588 :     while (state->memtupcount > 0)
    2219             :     {
    2220             :         SortTuple   stup;
    2221             : 
    2222             :         /* write the tuple to destTape */
    2223      855432 :         srcTapeIndex = state->memtuples[0].srctape;
    2224      855432 :         srcTape = state->inputTapes[srcTapeIndex];
    2225      855432 :         WRITETUP(state, state->destTape, &state->memtuples[0]);
    2226             : 
    2227             :         /* recycle the slot of the tuple we just wrote out, for the next read */
    2228      855432 :         if (state->memtuples[0].tuple)
    2229      735348 :             RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
    2230             : 
    2231             :         /*
    2232             :          * pull next tuple from the tape, and replace the written-out tuple in
    2233             :          * the heap with it.
    2234             :          */
    2235      855432 :         if (mergereadnext(state, srcTape, &stup))
    2236             :         {
    2237      854586 :             stup.srctape = srcTapeIndex;
    2238      854586 :             tuplesort_heap_replace_top(state, &stup);
    2239             :         }
    2240             :         else
    2241             :         {
    2242         846 :             tuplesort_heap_delete_top(state);
    2243         846 :             state->nInputRuns--;
    2244             :         }
    2245             :     }
    2246             : 
    2247             :     /*
    2248             :      * When the heap empties, we're done.  Write an end-of-run marker on the
    2249             :      * output tape.
    2250             :      */
    2251         156 :     markrunend(state->destTape);
    2252         156 : }
    2253             : 
    2254             : /*
    2255             :  * beginmerge - initialize for a merge pass
    2256             :  *
    2257             :  * Fill the merge heap with the first tuple from each input tape.
    2258             :  */
    2259             : static void
    2260         414 : beginmerge(Tuplesortstate *state)
    2261             : {
    2262             :     int         activeTapes;
    2263             :     int         srcTapeIndex;
    2264             : 
    2265             :     /* Heap should be empty here */
    2266             :     Assert(state->memtupcount == 0);
    2267             : 
    2268         414 :     activeTapes = Min(state->nInputTapes, state->nInputRuns);
    2269             : 
    2270        1906 :     for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    2271             :     {
    2272             :         SortTuple   tup;
    2273             : 
    2274        1492 :         if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
    2275             :         {
    2276        1234 :             tup.srctape = srcTapeIndex;
    2277        1234 :             tuplesort_heap_insert(state, &tup);
    2278             :         }
    2279             :     }
    2280         414 : }
    2281             : 
    2282             : /*
    2283             :  * mergereadnext - read next tuple from one merge input tape
    2284             :  *
    2285             :  * Returns false on EOF.
    2286             :  */
    2287             : static bool
    2288     5424264 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
    2289             : {
    2290             :     unsigned int tuplen;
    2291             : 
    2292             :     /* read next tuple, if any */
    2293     5424264 :     if ((tuplen = getlen(srcTape, true)) == 0)
    2294        1444 :         return false;
    2295     5422820 :     READTUP(state, stup, srcTape, tuplen);
    2296             : 
    2297     5422820 :     return true;
    2298             : }
    2299             : 
    2300             : /*
    2301             :  * dumptuples - remove tuples from memtuples and write initial run to tape
    2302             :  *
    2303             :  * When alltuples = true, dump everything currently in memory.  (This case is
    2304             :  * only used at end of input data.)
    2305             :  */
    2306             : static void
    2307     1065264 : dumptuples(Tuplesortstate *state, bool alltuples)
    2308             : {
    2309             :     int         memtupwrite;
    2310             :     int         i;
    2311             : 
    2312             :     /*
    2313             :      * Nothing to do if we still fit in available memory and have array slots,
    2314             :      * unless this is the final call during initial run generation.
    2315             :      */
    2316     1065264 :     if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
    2317     1064346 :         !alltuples)
    2318     1063782 :         return;
    2319             : 
    2320             :     /*
    2321             :      * Final call might require no sorting, in rare cases where we just so
    2322             :      * happen to have previously LACKMEM()'d at the point where exactly all
    2323             :      * remaining tuples are loaded into memory, just before input was
    2324             :      * exhausted.  In general, short final runs are quite possible, but avoid
    2325             :      * creating a completely empty run.  In a worker, though, we must produce
    2326             :      * at least one tape, even if it's empty.
    2327             :      */
    2328        1482 :     if (state->memtupcount == 0 && state->currentRun > 0)
    2329           0 :         return;
    2330             : 
    2331             :     Assert(state->status == TSS_BUILDRUNS);
    2332             : 
    2333             :     /*
    2334             :      * It seems unlikely that this limit will ever be exceeded, but take no
    2335             :      * chances
    2336             :      */
    2337        1482 :     if (state->currentRun == INT_MAX)
    2338           0 :         ereport(ERROR,
    2339             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2340             :                  errmsg("cannot have more than %d runs for an external sort",
    2341             :                         INT_MAX)));
    2342             : 
    2343        1482 :     if (state->currentRun > 0)
    2344         918 :         selectnewtape(state);
    2345             : 
    2346        1482 :     state->currentRun++;
    2347             : 
    2348        1482 :     if (trace_sort)
    2349           0 :         elog(LOG, "worker %d starting quicksort of run %d: %s",
    2350             :              state->worker, state->currentRun,
    2351             :              pg_rusage_show(&state->ru_start));
    2352             : 
    2353             :     /*
    2354             :      * Sort all tuples accumulated within the allowed amount of memory for
    2355             :      * this run using quicksort
    2356             :      */
    2357        1482 :     tuplesort_sort_memtuples(state);
    2358             : 
    2359        1482 :     if (trace_sort)
    2360           0 :         elog(LOG, "worker %d finished quicksort of run %d: %s",
    2361             :              state->worker, state->currentRun,
    2362             :              pg_rusage_show(&state->ru_start));
    2363             : 
    2364        1482 :     memtupwrite = state->memtupcount;
    2365     5064074 :     for (i = 0; i < memtupwrite; i++)
    2366             :     {
    2367     5062592 :         SortTuple  *stup = &state->memtuples[i];
    2368             : 
    2369     5062592 :         WRITETUP(state, state->destTape, stup);
    2370             :     }
    2371             : 
    2372        1482 :     state->memtupcount = 0;
    2373             : 
    2374             :     /*
    2375             :      * Reset tuple memory.  We've freed all of the tuples that we previously
    2376             :      * allocated.  It's important to avoid fragmentation when there is a stark
    2377             :      * change in the sizes of incoming tuples.  In bounded sorts,
    2378             :      * fragmentation due to AllocSetFree's bucketing by size class might be
    2379             :      * particularly bad if this step wasn't taken.
    2380             :      */
    2381        1482 :     MemoryContextReset(state->base.tuplecontext);
    2382             : 
    2383             :     /*
    2384             :      * Now update the memory accounting to subtract the memory used by the
    2385             :      * tuple.
    2386             :      */
    2387        1482 :     FREEMEM(state, state->tupleMem);
    2388        1482 :     state->tupleMem = 0;
    2389             : 
    2390        1482 :     markrunend(state->destTape);
    2391             : 
    2392        1482 :     if (trace_sort)
    2393           0 :         elog(LOG, "worker %d finished writing run %d to tape %d: %s",
    2394             :              state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
    2395             :              pg_rusage_show(&state->ru_start));
    2396             : }
    2397             : 
    2398             : /*
    2399             :  * tuplesort_rescan     - rewind and replay the scan
    2400             :  */
    2401             : void
    2402          46 : tuplesort_rescan(Tuplesortstate *state)
    2403             : {
    2404          46 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2405             : 
    2406             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2407             : 
    2408          46 :     switch (state->status)
    2409             :     {
    2410          40 :         case TSS_SORTEDINMEM:
    2411          40 :             state->current = 0;
    2412          40 :             state->eof_reached = false;
    2413          40 :             state->markpos_offset = 0;
    2414          40 :             state->markpos_eof = false;
    2415          40 :             break;
    2416           6 :         case TSS_SORTEDONTAPE:
    2417           6 :             LogicalTapeRewindForRead(state->result_tape, 0);
    2418           6 :             state->eof_reached = false;
    2419           6 :             state->markpos_block = 0L;
    2420           6 :             state->markpos_offset = 0;
    2421           6 :             state->markpos_eof = false;
    2422           6 :             break;
    2423           0 :         default:
    2424           0 :             elog(ERROR, "invalid tuplesort state");
    2425             :             break;
    2426             :     }
    2427             : 
    2428          46 :     MemoryContextSwitchTo(oldcontext);
    2429          46 : }
    2430             : 
    2431             : /*
    2432             :  * tuplesort_markpos    - saves current position in the merged sort file
    2433             :  */
    2434             : void
    2435      575444 : tuplesort_markpos(Tuplesortstate *state)
    2436             : {
    2437      575444 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2438             : 
    2439             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2440             : 
    2441      575444 :     switch (state->status)
    2442             :     {
    2443      566636 :         case TSS_SORTEDINMEM:
    2444      566636 :             state->markpos_offset = state->current;
    2445      566636 :             state->markpos_eof = state->eof_reached;
    2446      566636 :             break;
    2447        8808 :         case TSS_SORTEDONTAPE:
    2448        8808 :             LogicalTapeTell(state->result_tape,
    2449             :                             &state->markpos_block,
    2450             :                             &state->markpos_offset);
    2451        8808 :             state->markpos_eof = state->eof_reached;
    2452        8808 :             break;
    2453           0 :         default:
    2454           0 :             elog(ERROR, "invalid tuplesort state");
    2455             :             break;
    2456             :     }
    2457             : 
    2458      575444 :     MemoryContextSwitchTo(oldcontext);
    2459      575444 : }
    2460             : 
    2461             : /*
    2462             :  * tuplesort_restorepos - restores current position in merged sort file to
    2463             :  *                        last saved position
    2464             :  */
    2465             : void
    2466       30328 : tuplesort_restorepos(Tuplesortstate *state)
    2467             : {
    2468       30328 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2469             : 
    2470             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2471             : 
    2472       30328 :     switch (state->status)
    2473             :     {
    2474       24136 :         case TSS_SORTEDINMEM:
    2475       24136 :             state->current = state->markpos_offset;
    2476       24136 :             state->eof_reached = state->markpos_eof;
    2477       24136 :             break;
    2478        6192 :         case TSS_SORTEDONTAPE:
    2479        6192 :             LogicalTapeSeek(state->result_tape,
    2480             :                             state->markpos_block,
    2481             :                             state->markpos_offset);
    2482        6192 :             state->eof_reached = state->markpos_eof;
    2483        6192 :             break;
    2484           0 :         default:
    2485           0 :             elog(ERROR, "invalid tuplesort state");
    2486             :             break;
    2487             :     }
    2488             : 
    2489       30328 :     MemoryContextSwitchTo(oldcontext);
    2490       30328 : }
    2491             : 
    2492             : /*
    2493             :  * tuplesort_get_stats - extract summary statistics
    2494             :  *
    2495             :  * This can be called after tuplesort_performsort() finishes to obtain
    2496             :  * printable summary information about how the sort was performed.
    2497             :  */
    2498             : void
    2499         390 : tuplesort_get_stats(Tuplesortstate *state,
    2500             :                     TuplesortInstrumentation *stats)
    2501             : {
    2502             :     /*
    2503             :      * Note: it might seem we should provide both memory and disk usage for a
    2504             :      * disk-based sort.  However, the current code doesn't track memory space
    2505             :      * accurately once we have begun to return tuples to the caller (since we
    2506             :      * don't account for pfree's the caller is expected to do), so we cannot
    2507             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    2508             :      * to fix.  Is it worth creating an API for the memory context code to
    2509             :      * tell us how much is actually used in sortcontext?
    2510             :      */
    2511         390 :     tuplesort_updatemax(state);
    2512             : 
    2513         390 :     if (state->isMaxSpaceDisk)
    2514           6 :         stats->spaceType = SORT_SPACE_TYPE_DISK;
    2515             :     else
    2516         384 :         stats->spaceType = SORT_SPACE_TYPE_MEMORY;
    2517         390 :     stats->spaceUsed = (state->maxSpace + 1023) / 1024;
    2518             : 
    2519         390 :     switch (state->maxSpaceStatus)
    2520             :     {
    2521         384 :         case TSS_SORTEDINMEM:
    2522         384 :             if (state->boundUsed)
    2523          42 :                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
    2524             :             else
    2525         342 :                 stats->sortMethod = SORT_TYPE_QUICKSORT;
    2526         384 :             break;
    2527           0 :         case TSS_SORTEDONTAPE:
    2528           0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
    2529           0 :             break;
    2530           6 :         case TSS_FINALMERGE:
    2531           6 :             stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
    2532           6 :             break;
    2533           0 :         default:
    2534           0 :             stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
    2535           0 :             break;
    2536             :     }
    2537         390 : }
    2538             : 
    2539             : /*
    2540             :  * Convert TuplesortMethod to a string.
    2541             :  */
    2542             : const char *
    2543         288 : tuplesort_method_name(TuplesortMethod m)
    2544             : {
    2545         288 :     switch (m)
    2546             :     {
    2547           0 :         case SORT_TYPE_STILL_IN_PROGRESS:
    2548           0 :             return "still in progress";
    2549          42 :         case SORT_TYPE_TOP_N_HEAPSORT:
    2550          42 :             return "top-N heapsort";
    2551         240 :         case SORT_TYPE_QUICKSORT:
    2552         240 :             return "quicksort";
    2553           0 :         case SORT_TYPE_EXTERNAL_SORT:
    2554           0 :             return "external sort";
    2555           6 :         case SORT_TYPE_EXTERNAL_MERGE:
    2556           6 :             return "external merge";
    2557             :     }
    2558             : 
    2559           0 :     return "unknown";
    2560             : }
    2561             : 
    2562             : /*
    2563             :  * Convert TuplesortSpaceType to a string.
    2564             :  */
    2565             : const char *
    2566         252 : tuplesort_space_type_name(TuplesortSpaceType t)
    2567             : {
    2568             :     Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
    2569         252 :     return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
    2570             : }
    2571             : 
    2572             : 
    2573             : /*
    2574             :  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
    2575             :  */
    2576             : 
    2577             : /*
    2578             :  * Convert the existing unordered array of SortTuples to a bounded heap,
    2579             :  * discarding all but the smallest "state->bound" tuples.
    2580             :  *
    2581             :  * When working with a bounded heap, we want to keep the largest entry
    2582             :  * at the root (array entry zero), instead of the smallest as in the normal
    2583             :  * sort case.  This allows us to discard the largest entry cheaply.
    2584             :  * Therefore, we temporarily reverse the sort direction.
    2585             :  */
    2586             : static void
    2587         400 : make_bounded_heap(Tuplesortstate *state)
    2588             : {
    2589         400 :     int         tupcount = state->memtupcount;
    2590             :     int         i;
    2591             : 
    2592             :     Assert(state->status == TSS_INITIAL);
    2593             :     Assert(state->bounded);
    2594             :     Assert(tupcount >= state->bound);
    2595             :     Assert(SERIAL(state));
    2596             : 
    2597             :     /* Reverse sort direction so largest entry will be at root */
    2598         400 :     reversedirection(state);
    2599             : 
    2600         400 :     state->memtupcount = 0;      /* make the heap empty */
    2601       37912 :     for (i = 0; i < tupcount; i++)
    2602             :     {
    2603       37512 :         if (state->memtupcount < state->bound)
    2604             :         {
    2605             :             /* Insert next tuple into heap */
    2606             :             /* Must copy source tuple to avoid possible overwrite */
    2607       18556 :             SortTuple   stup = state->memtuples[i];
    2608             : 
    2609       18556 :             tuplesort_heap_insert(state, &stup);
    2610             :         }
    2611             :         else
    2612             :         {
    2613             :             /*
    2614             :              * The heap is full.  Replace the largest entry with the new
    2615             :              * tuple, or just discard it, if it's larger than anything already
    2616             :              * in the heap.
    2617             :              */
    2618       18956 :             if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
    2619             :             {
    2620        9772 :                 free_sort_tuple(state, &state->memtuples[i]);
    2621        9772 :                 CHECK_FOR_INTERRUPTS();
    2622             :             }
    2623             :             else
    2624        9184 :                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
    2625             :         }
    2626             :     }
    2627             : 
    2628             :     Assert(state->memtupcount == state->bound);
    2629         400 :     state->status = TSS_BOUNDED;
    2630         400 : }
    2631             : 
    2632             : /*
    2633             :  * Convert the bounded heap to a properly-sorted array
    2634             :  */
    2635             : static void
    2636         400 : sort_bounded_heap(Tuplesortstate *state)
    2637             : {
    2638         400 :     int         tupcount = state->memtupcount;
    2639             : 
    2640             :     Assert(state->status == TSS_BOUNDED);
    2641             :     Assert(state->bounded);
    2642             :     Assert(tupcount == state->bound);
    2643             :     Assert(SERIAL(state));
    2644             : 
    2645             :     /*
    2646             :      * We can unheapify in place because each delete-top call will remove the
    2647             :      * largest entry, which we can promptly store in the newly freed slot at
    2648             :      * the end.  Once we're down to a single-entry heap, we're done.
    2649             :      */
    2650       18556 :     while (state->memtupcount > 1)
    2651             :     {
    2652       18156 :         SortTuple   stup = state->memtuples[0];
    2653             : 
    2654             :         /* this sifts-up the next-largest entry and decreases memtupcount */
    2655       18156 :         tuplesort_heap_delete_top(state);
    2656       18156 :         state->memtuples[state->memtupcount] = stup;
    2657             :     }
    2658         400 :     state->memtupcount = tupcount;
    2659             : 
    2660             :     /*
    2661             :      * Reverse sort direction back to the original state.  This is not
    2662             :      * actually necessary but seems like a good idea for tidiness.
    2663             :      */
    2664         400 :     reversedirection(state);
    2665             : 
    2666         400 :     state->status = TSS_SORTEDINMEM;
    2667         400 :     state->boundUsed = true;
    2668         400 : }
    2669             : 
    2670             : /*
    2671             :  * Sort all memtuples using specialized qsort() routines.
    2672             :  *
    2673             :  * Quicksort is used for small in-memory sorts, and external sort runs.
    2674             :  */
    2675             : static void
    2676      214458 : tuplesort_sort_memtuples(Tuplesortstate *state)
    2677             : {
    2678             :     Assert(!LEADER(state));
    2679             : 
    2680      214458 :     if (state->memtupcount > 1)
    2681             :     {
    2682             :         /*
    2683             :          * Do we have the leading column's value or abbreviation in datum1,
    2684             :          * and is there a specialization for its comparator?
    2685             :          */
    2686       62924 :         if (state->base.haveDatum1 && state->base.sortKeys)
    2687             :         {
    2688       62890 :             if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
    2689             :             {
    2690        3076 :                 qsort_tuple_unsigned(state->memtuples,
    2691        3076 :                                      state->memtupcount,
    2692             :                                      state);
    2693        3060 :                 return;
    2694             :             }
    2695             : #if SIZEOF_DATUM >= 8
    2696       59814 :             else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
    2697             :             {
    2698        1036 :                 qsort_tuple_signed(state->memtuples,
    2699        1036 :                                    state->memtupcount,
    2700             :                                    state);
    2701        1036 :                 return;
    2702             :             }
    2703             : #endif
    2704       58778 :             else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
    2705             :             {
    2706       38270 :                 qsort_tuple_int32(state->memtuples,
    2707       38270 :                                   state->memtupcount,
    2708             :                                   state);
    2709       38210 :                 return;
    2710             :             }
    2711             :         }
    2712             : 
    2713             :         /* Can we use the single-key sort function? */
    2714       20542 :         if (state->base.onlyKey != NULL)
    2715             :         {
    2716        8452 :             qsort_ssup(state->memtuples, state->memtupcount,
    2717        8452 :                        state->base.onlyKey);
    2718             :         }
    2719             :         else
    2720             :         {
    2721       12090 :             qsort_tuple(state->memtuples,
    2722       12090 :                         state->memtupcount,
    2723             :                         state->base.comparetup,
    2724             :                         state);
    2725             :         }
    2726             :     }
    2727             : }
    2728             : 
    2729             : /*
    2730             :  * Insert a new tuple into an empty or existing heap, maintaining the
    2731             :  * heap invariant.  Caller is responsible for ensuring there's room.
    2732             :  *
    2733             :  * Note: For some callers, tuple points to a memtuples[] entry above the
    2734             :  * end of the heap.  This is safe as long as it's not immediately adjacent
    2735             :  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
    2736             :  * is, it might get overwritten before being moved into the heap!
    2737             :  */
    2738             : static void
    2739       19790 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
    2740             : {
    2741             :     SortTuple  *memtuples;
    2742             :     int         j;
    2743             : 
    2744       19790 :     memtuples = state->memtuples;
    2745             :     Assert(state->memtupcount < state->memtupsize);
    2746             : 
    2747       19790 :     CHECK_FOR_INTERRUPTS();
    2748             : 
    2749             :     /*
    2750             :      * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
    2751             :      * using 1-based array indexes, not 0-based.
    2752             :      */
    2753       19790 :     j = state->memtupcount++;
    2754       57620 :     while (j > 0)
    2755             :     {
    2756       50864 :         int         i = (j - 1) >> 1;
    2757             : 
    2758       50864 :         if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
    2759       13034 :             break;
    2760       37830 :         memtuples[j] = memtuples[i];
    2761       37830 :         j = i;
    2762             :     }
    2763       19790 :     memtuples[j] = *tuple;
    2764       19790 : }
    2765             : 
    2766             : /*
    2767             :  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
    2768             :  * memtupcount, and sift up to maintain the heap invariant.
    2769             :  *
    2770             :  * The caller has already free'd the tuple the top node points to,
    2771             :  * if necessary.
    2772             :  */
    2773             : static void
    2774       19342 : tuplesort_heap_delete_top(Tuplesortstate *state)
    2775             : {
    2776       19342 :     SortTuple  *memtuples = state->memtuples;
    2777             :     SortTuple  *tuple;
    2778             : 
    2779       19342 :     if (--state->memtupcount <= 0)
    2780         282 :         return;
    2781             : 
    2782             :     /*
    2783             :      * Remove the last tuple in the heap, and re-insert it, by replacing the
    2784             :      * current top node with it.
    2785             :      */
    2786       19060 :     tuple = &memtuples[state->memtupcount];
    2787       19060 :     tuplesort_heap_replace_top(state, tuple);
    2788             : }
    2789             : 
    2790             : /*
    2791             :  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
    2792             :  * maintain the heap invariant.
    2793             :  *
    2794             :  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
    2795             :  * Heapsort, steps H3-H8).
    2796             :  */
    2797             : static void
    2798     5952806 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
    2799             : {
    2800     5952806 :     SortTuple  *memtuples = state->memtuples;
    2801             :     unsigned int i,
    2802             :                 n;
    2803             : 
    2804             :     Assert(state->memtupcount >= 1);
    2805             : 
    2806     5952806 :     CHECK_FOR_INTERRUPTS();
    2807             : 
    2808             :     /*
    2809             :      * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
    2810             :      * This prevents overflow in the "2 * i + 1" calculation, since at the top
    2811             :      * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
    2812             :      */
    2813     5952806 :     n = state->memtupcount;
    2814     5952806 :     i = 0;                      /* i is where the "hole" is */
    2815             :     for (;;)
    2816     1841674 :     {
    2817     7794480 :         unsigned int j = 2 * i + 1;
    2818             : 
    2819     7794480 :         if (j >= n)
    2820     1762300 :             break;
    2821     8465550 :         if (j + 1 < n &&
    2822     2433370 :             COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
    2823      960618 :             j++;
    2824     6032180 :         if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
    2825     4190506 :             break;
    2826     1841674 :         memtuples[i] = memtuples[j];
    2827     1841674 :         i = j;
    2828             :     }
    2829     5952806 :     memtuples[i] = *tuple;
    2830     5952806 : }
    2831             : 
    2832             : /*
    2833             :  * Function to reverse the sort direction from its current state
    2834             :  *
    2835             :  * It is not safe to call this when performing hash tuplesorts
    2836             :  */
    2837             : static void
    2838         800 : reversedirection(Tuplesortstate *state)
    2839             : {
    2840         800 :     SortSupport sortKey = state->base.sortKeys;
    2841             :     int         nkey;
    2842             : 
    2843        1960 :     for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
    2844             :     {
    2845        1160 :         sortKey->ssup_reverse = !sortKey->ssup_reverse;
    2846        1160 :         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    2847             :     }
    2848         800 : }
    2849             : 
    2850             : 
    2851             : /*
    2852             :  * Tape interface routines
    2853             :  */
    2854             : 
    2855             : static unsigned int
    2856     5697270 : getlen(LogicalTape *tape, bool eofOK)
    2857             : {
    2858             :     unsigned int len;
    2859             : 
    2860     5697270 :     if (LogicalTapeRead(tape,
    2861             :                         &len, sizeof(len)) != sizeof(len))
    2862           0 :         elog(ERROR, "unexpected end of tape");
    2863     5697270 :     if (len == 0 && !eofOK)
    2864           0 :         elog(ERROR, "unexpected end of data");
    2865     5697270 :     return len;
    2866             : }
    2867             : 
    2868             : static void
    2869        1638 : markrunend(LogicalTape *tape)
    2870             : {
    2871        1638 :     unsigned int len = 0;
    2872             : 
    2873        1638 :     LogicalTapeWrite(tape, &len, sizeof(len));
    2874        1638 : }
    2875             : 
    2876             : /*
    2877             :  * Get memory for tuple from within READTUP() routine.
    2878             :  *
    2879             :  * We use next free slot from the slab allocator, or palloc() if the tuple
    2880             :  * is too large for that.
    2881             :  */
    2882             : void *
    2883     5395556 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
    2884             : {
    2885             :     SlabSlot   *buf;
    2886             : 
    2887             :     /*
    2888             :      * We pre-allocate enough slots in the slab arena that we should never run
    2889             :      * out.
    2890             :      */
    2891             :     Assert(state->slabFreeHead);
    2892             : 
    2893     5395556 :     if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
    2894           0 :         return MemoryContextAlloc(state->base.sortcontext, tuplen);
    2895             :     else
    2896             :     {
    2897     5395556 :         buf = state->slabFreeHead;
    2898             :         /* Reuse this slot */
    2899     5395556 :         state->slabFreeHead = buf->nextfree;
    2900             : 
    2901     5395556 :         return buf;
    2902             :     }
    2903             : }
    2904             : 
    2905             : 
    2906             : /*
    2907             :  * Parallel sort routines
    2908             :  */
    2909             : 
    2910             : /*
    2911             :  * tuplesort_estimate_shared - estimate required shared memory allocation
    2912             :  *
    2913             :  * nWorkers is an estimate of the number of workers (it's the number that
    2914             :  * will be requested).
    2915             :  */
    2916             : Size
    2917         158 : tuplesort_estimate_shared(int nWorkers)
    2918             : {
    2919             :     Size        tapesSize;
    2920             : 
    2921             :     Assert(nWorkers > 0);
    2922             : 
    2923             :     /* Make sure that BufFile shared state is MAXALIGN'd */
    2924         158 :     tapesSize = mul_size(sizeof(TapeShare), nWorkers);
    2925         158 :     tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
    2926             : 
    2927         158 :     return tapesSize;
    2928             : }
    2929             : 
    2930             : /*
    2931             :  * tuplesort_initialize_shared - initialize shared tuplesort state
    2932             :  *
    2933             :  * Must be called from leader process before workers are launched, to
    2934             :  * establish state needed up-front for worker tuplesortstates.  nWorkers
    2935             :  * should match the argument passed to tuplesort_estimate_shared().
    2936             :  */
    2937             : void
    2938         222 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
    2939             : {
    2940             :     int         i;
    2941             : 
    2942             :     Assert(nWorkers > 0);
    2943             : 
    2944         222 :     SpinLockInit(&shared->mutex);
    2945         222 :     shared->currentWorker = 0;
    2946         222 :     shared->workersFinished = 0;
    2947         222 :     SharedFileSetInit(&shared->fileset, seg);
    2948         222 :     shared->nTapes = nWorkers;
    2949         674 :     for (i = 0; i < nWorkers; i++)
    2950             :     {
    2951         452 :         shared->tapes[i].firstblocknumber = 0L;
    2952             :     }
    2953         222 : }
    2954             : 
    2955             : /*
    2956             :  * tuplesort_attach_shared - attach to shared tuplesort state
    2957             :  *
    2958             :  * Must be called by all worker processes.
    2959             :  */
    2960             : void
    2961         224 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
    2962             : {
    2963             :     /* Attach to SharedFileSet */
    2964         224 :     SharedFileSetAttach(&shared->fileset, seg);
    2965         224 : }
    2966             : 
    2967             : /*
    2968             :  * worker_get_identifier - Assign and return ordinal identifier for worker
    2969             :  *
    2970             :  * The order in which these are assigned is not well defined, and should not
    2971             :  * matter; worker numbers across parallel sort participants need only be
    2972             :  * distinct and gapless.  logtape.c requires this.
    2973             :  *
    2974             :  * Note that the identifiers assigned from here have no relation to
    2975             :  * ParallelWorkerNumber number, to avoid making any assumption about
    2976             :  * caller's requirements.  However, we do follow the ParallelWorkerNumber
    2977             :  * convention of representing a non-worker with worker number -1.  This
    2978             :  * includes the leader, as well as serial Tuplesort processes.
    2979             :  */
    2980             : static int
    2981         444 : worker_get_identifier(Tuplesortstate *state)
    2982             : {
    2983         444 :     Sharedsort *shared = state->shared;
    2984             :     int         worker;
    2985             : 
    2986             :     Assert(WORKER(state));
    2987             : 
    2988         444 :     SpinLockAcquire(&shared->mutex);
    2989         444 :     worker = shared->currentWorker++;
    2990         444 :     SpinLockRelease(&shared->mutex);
    2991             : 
    2992         444 :     return worker;
    2993             : }
    2994             : 
    2995             : /*
    2996             :  * worker_freeze_result_tape - freeze worker's result tape for leader
    2997             :  *
    2998             :  * This is called by workers just after the result tape has been determined,
    2999             :  * instead of calling LogicalTapeFreeze() directly.  They do so because
    3000             :  * workers require a few additional steps over similar serial
    3001             :  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
    3002             :  * steps are around freeing now unneeded resources, and representing to
    3003             :  * leader that worker's input run is available for its merge.
    3004             :  *
    3005             :  * There should only be one final output run for each worker, which consists
    3006             :  * of all tuples that were originally input into worker.
    3007             :  */
    3008             : static void
    3009         444 : worker_freeze_result_tape(Tuplesortstate *state)
    3010             : {
    3011         444 :     Sharedsort *shared = state->shared;
    3012             :     TapeShare   output;
    3013             : 
    3014             :     Assert(WORKER(state));
    3015             :     Assert(state->result_tape != NULL);
    3016             :     Assert(state->memtupcount == 0);
    3017             : 
    3018             :     /*
    3019             :      * Free most remaining memory, in case caller is sensitive to our holding
    3020             :      * on to it.  memtuples may not be a tiny merge heap at this point.
    3021             :      */
    3022         444 :     pfree(state->memtuples);
    3023             :     /* Be tidy */
    3024         444 :     state->memtuples = NULL;
    3025         444 :     state->memtupsize = 0;
    3026             : 
    3027             :     /*
    3028             :      * Parallel worker requires result tape metadata, which is to be stored in
    3029             :      * shared memory for leader
    3030             :      */
    3031         444 :     LogicalTapeFreeze(state->result_tape, &output);
    3032             : 
    3033             :     /* Store properties of output tape, and update finished worker count */
    3034         444 :     SpinLockAcquire(&shared->mutex);
    3035         444 :     shared->tapes[state->worker] = output;
    3036         444 :     shared->workersFinished++;
    3037         444 :     SpinLockRelease(&shared->mutex);
    3038         444 : }
    3039             : 
    3040             : /*
    3041             :  * worker_nomergeruns - dump memtuples in worker, without merging
    3042             :  *
    3043             :  * This called as an alternative to mergeruns() with a worker when no
    3044             :  * merging is required.
    3045             :  */
    3046             : static void
    3047         444 : worker_nomergeruns(Tuplesortstate *state)
    3048             : {
    3049             :     Assert(WORKER(state));
    3050             :     Assert(state->result_tape == NULL);
    3051             :     Assert(state->nOutputRuns == 1);
    3052             : 
    3053         444 :     state->result_tape = state->destTape;
    3054         444 :     worker_freeze_result_tape(state);
    3055         444 : }
    3056             : 
    3057             : /*
    3058             :  * leader_takeover_tapes - create tapeset for leader from worker tapes
    3059             :  *
    3060             :  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
    3061             :  * sorting has occurred in workers, all of which must have already returned
    3062             :  * from tuplesort_performsort().
    3063             :  *
    3064             :  * When this returns, leader process is left in a state that is virtually
    3065             :  * indistinguishable from it having generated runs as a serial external sort
    3066             :  * might have.
    3067             :  */
    3068             : static void
    3069         156 : leader_takeover_tapes(Tuplesortstate *state)
    3070             : {
    3071         156 :     Sharedsort *shared = state->shared;
    3072         156 :     int         nParticipants = state->nParticipants;
    3073             :     int         workersFinished;
    3074             :     int         j;
    3075             : 
    3076             :     Assert(LEADER(state));
    3077             :     Assert(nParticipants >= 1);
    3078             : 
    3079         156 :     SpinLockAcquire(&shared->mutex);
    3080         156 :     workersFinished = shared->workersFinished;
    3081         156 :     SpinLockRelease(&shared->mutex);
    3082             : 
    3083         156 :     if (nParticipants != workersFinished)
    3084           0 :         elog(ERROR, "cannot take over tapes before all workers finish");
    3085             : 
    3086             :     /*
    3087             :      * Create the tapeset from worker tapes, including a leader-owned tape at
    3088             :      * the end.  Parallel workers are far more expensive than logical tapes,
    3089             :      * so the number of tapes allocated here should never be excessive.
    3090             :      */
    3091         156 :     inittapestate(state, nParticipants);
    3092         156 :     state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
    3093             : 
    3094             :     /*
    3095             :      * Set currentRun to reflect the number of runs we will merge (it's not
    3096             :      * used for anything, this is just pro forma)
    3097             :      */
    3098         156 :     state->currentRun = nParticipants;
    3099             : 
    3100             :     /*
    3101             :      * Initialize the state to look the same as after building the initial
    3102             :      * runs.
    3103             :      *
    3104             :      * There will always be exactly 1 run per worker, and exactly one input
    3105             :      * tape per run, because workers always output exactly 1 run, even when
    3106             :      * there were no input tuples for workers to sort.
    3107             :      */
    3108         156 :     state->inputTapes = NULL;
    3109         156 :     state->nInputTapes = 0;
    3110         156 :     state->nInputRuns = 0;
    3111             : 
    3112         156 :     state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
    3113         156 :     state->nOutputTapes = nParticipants;
    3114         156 :     state->nOutputRuns = nParticipants;
    3115             : 
    3116         472 :     for (j = 0; j < nParticipants; j++)
    3117             :     {
    3118         316 :         state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    3119             :     }
    3120             : 
    3121         156 :     state->status = TSS_BUILDRUNS;
    3122         156 : }
    3123             : 
    3124             : /*
    3125             :  * Convenience routine to free a tuple previously loaded into sort memory
    3126             :  */
    3127             : static void
    3128     3759792 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
    3129             : {
    3130     3759792 :     if (stup->tuple)
    3131             :     {
    3132     3599594 :         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    3133     3599594 :         pfree(stup->tuple);
    3134     3599594 :         stup->tuple = NULL;
    3135             :     }
    3136     3759792 : }
    3137             : 
    3138             : int
    3139           0 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
    3140             : {
    3141           0 :     if (x < y)
    3142           0 :         return -1;
    3143           0 :     else if (x > y)
    3144           0 :         return 1;
    3145             :     else
    3146           0 :         return 0;
    3147             : }
    3148             : 
    3149             : #if SIZEOF_DATUM >= 8
    3150             : int
    3151     1045702 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
    3152             : {
    3153     1045702 :     int64       xx = DatumGetInt64(x);
    3154     1045702 :     int64       yy = DatumGetInt64(y);
    3155             : 
    3156     1045702 :     if (xx < yy)
    3157      371158 :         return -1;
    3158      674544 :     else if (xx > yy)
    3159      347124 :         return 1;
    3160             :     else
    3161      327420 :         return 0;
    3162             : }
    3163             : #endif
    3164             : 
    3165             : int
    3166   160535790 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
    3167             : {
    3168   160535790 :     int32       xx = DatumGetInt32(x);
    3169   160535790 :     int32       yy = DatumGetInt32(y);
    3170             : 
    3171   160535790 :     if (xx < yy)
    3172    37452670 :         return -1;
    3173   123083120 :     else if (xx > yy)
    3174    36336162 :         return 1;
    3175             :     else
    3176    86746958 :         return 0;
    3177             : }

Generated by: LCOV version 1.14