LCOV - code coverage report
Current view: top level - src/backend/utils/sort - tuplesort.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 716 812 88.2 %
Date: 2024-04-19 10:11:37 Functions: 54 55 98.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * tuplesort.c
       4             :  *    Generalized tuple sorting routines.
       5             :  *
       6             :  * This module provides a generalized facility for tuple sorting, which can be
       7             :  * applied to different kinds of sortable objects.  Implementation of
       8             :  * the particular sorting variants is given in tuplesortvariants.c.
       9             :  * This module works efficiently for both small and large amounts
      10             :  * of data.  Small amounts are sorted in-memory using qsort().  Large
      11             :  * amounts are sorted using temporary files and a standard external sort
      12             :  * algorithm.
      13             :  *
      14             :  * See Knuth, volume 3, for more than you want to know about external
      15             :  * sorting algorithms.  The algorithm we use is a balanced k-way merge.
      16             :  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
      17             :  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
      18             :  * merge is better.  Knuth is assuming that tape drives are expensive
      19             :  * beasts, and in particular that there will always be many more runs than
      20             :  * tape drives.  The polyphase merge algorithm was good at keeping all the
      21             :  * tape drives busy, but in our implementation a "tape drive" doesn't cost
      22             :  * much more than a few Kb of memory buffers, so we can afford to have
      23             :  * lots of them.  In particular, if we can have as many tape drives as
      24             :  * sorted runs, we can eliminate any repeated I/O at all.
      25             :  *
      26             :  * Historically, we divided the input into sorted runs using replacement
      27             :  * selection, in the form of a priority tree implemented as a heap
      28             :  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
      29             :  * for run generation.
      30             :  *
      31             :  * The approximate amount of memory allowed for any one sort operation
      32             :  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
      33             :  * we absorb tuples and simply store them in an unsorted array as long as
      34             :  * we haven't exceeded workMem.  If we reach the end of the input without
      35             :  * exceeding workMem, we sort the array using qsort() and subsequently return
      36             :  * tuples just by scanning the tuple array sequentially.  If we do exceed
      37             :  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
      38             :  * When tuples are dumped in batch after quicksorting, we begin a new run
      39             :  * with a new output tape.  If we reach the max number of tapes, we write
      40             :  * subsequent runs on the existing tapes in a round-robin fashion.  We will
      41             :  * need multiple merge passes to finish the merge in that case.  After the
      42             :  * end of the input is reached, we dump out remaining tuples in memory into
      43             :  * a final run, then merge the runs.
      44             :  *
      45             :  * When merging runs, we use a heap containing just the frontmost tuple from
      46             :  * each source run; we repeatedly output the smallest tuple and replace it
      47             :  * with the next tuple from its source tape (if any).  When the heap empties,
      48             :  * the merge is complete.  The basic merge algorithm thus needs very little
      49             :  * memory --- only M tuples for an M-way merge, and M is constrained to a
      50             :  * small number.  However, we can still make good use of our full workMem
      51             :  * allocation by pre-reading additional blocks from each source tape.  Without
      52             :  * prereading, our access pattern to the temporary file would be very erratic;
      53             :  * on average we'd read one block from each of M source tapes during the same
      54             :  * time that we're writing M blocks to the output tape, so there is no
      55             :  * sequentiality of access at all, defeating the read-ahead methods used by
      56             :  * most Unix kernels.  Worse, the output tape gets written into a very random
      57             :  * sequence of blocks of the temp file, ensuring that things will be even
      58             :  * worse when it comes time to read that tape.  A straightforward merge pass
      59             :  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
      60             :  * by prereading from each source tape sequentially, loading about workMem/M
      61             :  * bytes from each tape in turn, and making the sequential blocks immediately
      62             :  * available for reuse.  This approach helps to localize both read and write
      63             :  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
      64             :  * much memory to use for the buffers.
      65             :  *
      66             :  * In the current code we determine the number of input tapes M on the basis
      67             :  * of workMem: we want workMem/M to be large enough that we read a fair
      68             :  * amount of data each time we read from a tape, so as to maintain the
      69             :  * locality of access described above.  Nonetheless, with large workMem we
      70             :  * can have many tapes.  The logical "tapes" are implemented by logtape.c,
      71             :  * which avoids space wastage by recycling disk space as soon as each block
      72             :  * is read from its "tape".
      73             :  *
      74             :  * When the caller requests random access to the sort result, we form
      75             :  * the final sorted run on a logical tape which is then "frozen", so
      76             :  * that we can access it randomly.  When the caller does not need random
      77             :  * access, we return from tuplesort_performsort() as soon as we are down
      78             :  * to one run per logical tape.  The final merge is then performed
      79             :  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
      80             :  * saves one cycle of writing all the data out to disk and reading it in.
      81             :  *
      82             :  * This module supports parallel sorting.  Parallel sorts involve coordination
      83             :  * among one or more worker processes, and a leader process, each with its own
      84             :  * tuplesort state.  The leader process (or, more accurately, the
      85             :  * Tuplesortstate associated with a leader process) creates a full tapeset
      86             :  * consisting of worker tapes with one run to merge; a run for every
      87             :  * worker process.  This is then merged.  Worker processes are guaranteed to
      88             :  * produce exactly one output run from their partial input.
      89             :  *
      90             :  *
      91             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      92             :  * Portions Copyright (c) 1994, Regents of the University of California
      93             :  *
      94             :  * IDENTIFICATION
      95             :  *    src/backend/utils/sort/tuplesort.c
      96             :  *
      97             :  *-------------------------------------------------------------------------
      98             :  */
      99             : 
     100             : #include "postgres.h"
     101             : 
     102             : #include <limits.h>
     103             : 
     104             : #include "commands/tablespace.h"
     105             : #include "miscadmin.h"
     106             : #include "pg_trace.h"
     107             : #include "storage/shmem.h"
     108             : #include "utils/memutils.h"
     109             : #include "utils/pg_rusage.h"
     110             : #include "utils/tuplesort.h"
     111             : 
     112             : /*
     113             :  * Initial size of memtuples array.  We're trying to select this size so that
     114             :  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
     115             :  * allocation might possibly be lowered.  However, we don't consider array sizes
     116             :  * less than 1024.
     117             :  *
     118             :  */
     119             : #define INITIAL_MEMTUPSIZE Max(1024, \
     120             :     ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
     121             : 
     122             : /* GUC variables */
     123             : #ifdef TRACE_SORT
     124             : bool        trace_sort = false;
     125             : #endif
     126             : 
     127             : #ifdef DEBUG_BOUNDED_SORT
     128             : bool        optimize_bounded_sort = true;
     129             : #endif
     130             : 
     131             : 
     132             : /*
     133             :  * During merge, we use a pre-allocated set of fixed-size slots to hold
     134             :  * tuples.  To avoid palloc/pfree overhead.
     135             :  *
     136             :  * Merge doesn't require a lot of memory, so we can afford to waste some,
     137             :  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
     138             :  * palloc() overhead is not significant anymore.
     139             :  *
     140             :  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
     141             :  * slot holds a tuple.
     142             :  */
     143             : #define SLAB_SLOT_SIZE 1024
     144             : 
     145             : typedef union SlabSlot
     146             : {
     147             :     union SlabSlot *nextfree;
     148             :     char        buffer[SLAB_SLOT_SIZE];
     149             : } SlabSlot;
     150             : 
     151             : /*
     152             :  * Possible states of a Tuplesort object.  These denote the states that
     153             :  * persist between calls of Tuplesort routines.
     154             :  */
     155             : typedef enum
     156             : {
     157             :     TSS_INITIAL,                /* Loading tuples; still within memory limit */
     158             :     TSS_BOUNDED,                /* Loading tuples into bounded-size heap */
     159             :     TSS_BUILDRUNS,              /* Loading tuples; writing to tape */
     160             :     TSS_SORTEDINMEM,            /* Sort completed entirely in memory */
     161             :     TSS_SORTEDONTAPE,           /* Sort completed, final run is on tape */
     162             :     TSS_FINALMERGE,             /* Performing final merge on-the-fly */
     163             : } TupSortStatus;
     164             : 
     165             : /*
     166             :  * Parameters for calculation of number of tapes to use --- see inittapes()
     167             :  * and tuplesort_merge_order().
     168             :  *
     169             :  * In this calculation we assume that each tape will cost us about 1 blocks
     170             :  * worth of buffer space.  This ignores the overhead of all the other data
     171             :  * structures needed for each tape, but it's probably close enough.
     172             :  *
     173             :  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
     174             :  * input tape, for pre-reading (see discussion at top of file).  This is *in
     175             :  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
     176             :  */
     177             : #define MINORDER        6       /* minimum merge order */
     178             : #define MAXORDER        500     /* maximum merge order */
     179             : #define TAPE_BUFFER_OVERHEAD        BLCKSZ
     180             : #define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
     181             : 
     182             : 
     183             : /*
     184             :  * Private state of a Tuplesort operation.
     185             :  */
     186             : struct Tuplesortstate
     187             : {
     188             :     TuplesortPublic base;
     189             :     TupSortStatus status;       /* enumerated value as shown above */
     190             :     bool        bounded;        /* did caller specify a maximum number of
     191             :                                  * tuples to return? */
     192             :     bool        boundUsed;      /* true if we made use of a bounded heap */
     193             :     int         bound;          /* if bounded, the maximum number of tuples */
     194             :     int64       tupleMem;       /* memory consumed by individual tuples.
     195             :                                  * storing this separately from what we track
     196             :                                  * in availMem allows us to subtract the
     197             :                                  * memory consumed by all tuples when dumping
     198             :                                  * tuples to tape */
     199             :     int64       availMem;       /* remaining memory available, in bytes */
     200             :     int64       allowedMem;     /* total memory allowed, in bytes */
     201             :     int         maxTapes;       /* max number of input tapes to merge in each
     202             :                                  * pass */
     203             :     int64       maxSpace;       /* maximum amount of space occupied among sort
     204             :                                  * of groups, either in-memory or on-disk */
     205             :     bool        isMaxSpaceDisk; /* true when maxSpace is value for on-disk
     206             :                                  * space, false when it's value for in-memory
     207             :                                  * space */
     208             :     TupSortStatus maxSpaceStatus;   /* sort status when maxSpace was reached */
     209             :     LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
     210             : 
     211             :     /*
     212             :      * This array holds the tuples now in sort memory.  If we are in state
     213             :      * INITIAL, the tuples are in no particular order; if we are in state
     214             :      * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
     215             :      * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
     216             :      * H.  In state SORTEDONTAPE, the array is not used.
     217             :      */
     218             :     SortTuple  *memtuples;      /* array of SortTuple structs */
     219             :     int         memtupcount;    /* number of tuples currently present */
     220             :     int         memtupsize;     /* allocated length of memtuples array */
     221             :     bool        growmemtuples;  /* memtuples' growth still underway? */
     222             : 
     223             :     /*
     224             :      * Memory for tuples is sometimes allocated using a simple slab allocator,
     225             :      * rather than with palloc().  Currently, we switch to slab allocation
     226             :      * when we start merging.  Merging only needs to keep a small, fixed
     227             :      * number of tuples in memory at any time, so we can avoid the
     228             :      * palloc/pfree overhead by recycling a fixed number of fixed-size slots
     229             :      * to hold the tuples.
     230             :      *
     231             :      * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
     232             :      * slots.  The allocation is sized to have one slot per tape, plus one
     233             :      * additional slot.  We need that many slots to hold all the tuples kept
     234             :      * in the heap during merge, plus the one we have last returned from the
     235             :      * sort, with tuplesort_gettuple.
     236             :      *
     237             :      * Initially, all the slots are kept in a linked list of free slots.  When
     238             :      * a tuple is read from a tape, it is put to the next available slot, if
     239             :      * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
     240             :      * instead.
     241             :      *
     242             :      * When we're done processing a tuple, we return the slot back to the free
     243             :      * list, or pfree() if it was palloc'd.  We know that a tuple was
     244             :      * allocated from the slab, if its pointer value is between
     245             :      * slabMemoryBegin and -End.
     246             :      *
     247             :      * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
     248             :      * tracking memory usage is not used.
     249             :      */
     250             :     bool        slabAllocatorUsed;
     251             : 
     252             :     char       *slabMemoryBegin;    /* beginning of slab memory arena */
     253             :     char       *slabMemoryEnd;  /* end of slab memory arena */
     254             :     SlabSlot   *slabFreeHead;   /* head of free list */
     255             : 
     256             :     /* Memory used for input and output tape buffers. */
     257             :     size_t      tape_buffer_mem;
     258             : 
     259             :     /*
     260             :      * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
     261             :      * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
     262             :      * modes), we remember the tuple in 'lastReturnedTuple', so that we can
     263             :      * recycle the memory on next gettuple call.
     264             :      */
     265             :     void       *lastReturnedTuple;
     266             : 
     267             :     /*
     268             :      * While building initial runs, this is the current output run number.
     269             :      * Afterwards, it is the number of initial runs we made.
     270             :      */
     271             :     int         currentRun;
     272             : 
     273             :     /*
     274             :      * Logical tapes, for merging.
     275             :      *
     276             :      * The initial runs are written in the output tapes.  In each merge pass,
     277             :      * the output tapes of the previous pass become the input tapes, and new
     278             :      * output tapes are created as needed.  When nInputTapes equals
     279             :      * nInputRuns, there is only one merge pass left.
     280             :      */
     281             :     LogicalTape **inputTapes;
     282             :     int         nInputTapes;
     283             :     int         nInputRuns;
     284             : 
     285             :     LogicalTape **outputTapes;
     286             :     int         nOutputTapes;
     287             :     int         nOutputRuns;
     288             : 
     289             :     LogicalTape *destTape;      /* current output tape */
     290             : 
     291             :     /*
     292             :      * These variables are used after completion of sorting to keep track of
     293             :      * the next tuple to return.  (In the tape case, the tape's current read
     294             :      * position is also critical state.)
     295             :      */
     296             :     LogicalTape *result_tape;   /* actual tape of finished output */
     297             :     int         current;        /* array index (only used if SORTEDINMEM) */
     298             :     bool        eof_reached;    /* reached EOF (needed for cursors) */
     299             : 
     300             :     /* markpos_xxx holds marked position for mark and restore */
     301             :     int64       markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
     302             :     int         markpos_offset; /* saved "current", or offset in tape block */
     303             :     bool        markpos_eof;    /* saved "eof_reached" */
     304             : 
     305             :     /*
     306             :      * These variables are used during parallel sorting.
     307             :      *
     308             :      * worker is our worker identifier.  Follows the general convention that
     309             :      * -1 value relates to a leader tuplesort, and values >= 0 worker
     310             :      * tuplesorts. (-1 can also be a serial tuplesort.)
     311             :      *
     312             :      * shared is mutable shared memory state, which is used to coordinate
     313             :      * parallel sorts.
     314             :      *
     315             :      * nParticipants is the number of worker Tuplesortstates known by the
     316             :      * leader to have actually been launched, which implies that they must
     317             :      * finish a run that the leader needs to merge.  Typically includes a
     318             :      * worker state held by the leader process itself.  Set in the leader
     319             :      * Tuplesortstate only.
     320             :      */
     321             :     int         worker;
     322             :     Sharedsort *shared;
     323             :     int         nParticipants;
     324             : 
     325             :     /*
     326             :      * Additional state for managing "abbreviated key" sortsupport routines
     327             :      * (which currently may be used by all cases except the hash index case).
     328             :      * Tracks the intervals at which the optimization's effectiveness is
     329             :      * tested.
     330             :      */
     331             :     int64       abbrevNext;     /* Tuple # at which to next check
     332             :                                  * applicability */
     333             : 
     334             :     /*
     335             :      * Resource snapshot for time of sort start.
     336             :      */
     337             : #ifdef TRACE_SORT
     338             :     PGRUsage    ru_start;
     339             : #endif
     340             : };
     341             : 
     342             : /*
     343             :  * Private mutable state of tuplesort-parallel-operation.  This is allocated
     344             :  * in shared memory.
     345             :  */
     346             : struct Sharedsort
     347             : {
     348             :     /* mutex protects all fields prior to tapes */
     349             :     slock_t     mutex;
     350             : 
     351             :     /*
     352             :      * currentWorker generates ordinal identifier numbers for parallel sort
     353             :      * workers.  These start from 0, and are always gapless.
     354             :      *
     355             :      * Workers increment workersFinished to indicate having finished.  If this
     356             :      * is equal to state.nParticipants within the leader, leader is ready to
     357             :      * merge worker runs.
     358             :      */
     359             :     int         currentWorker;
     360             :     int         workersFinished;
     361             : 
     362             :     /* Temporary file space */
     363             :     SharedFileSet fileset;
     364             : 
     365             :     /* Size of tapes flexible array */
     366             :     int         nTapes;
     367             : 
     368             :     /*
     369             :      * Tapes array used by workers to report back information needed by the
     370             :      * leader to concatenate all worker tapes into one for merging
     371             :      */
     372             :     TapeShare   tapes[FLEXIBLE_ARRAY_MEMBER];
     373             : };
     374             : 
     375             : /*
     376             :  * Is the given tuple allocated from the slab memory arena?
     377             :  */
     378             : #define IS_SLAB_SLOT(state, tuple) \
     379             :     ((char *) (tuple) >= (state)->slabMemoryBegin && \
     380             :      (char *) (tuple) < (state)->slabMemoryEnd)
     381             : 
     382             : /*
     383             :  * Return the given tuple to the slab memory free list, or free it
     384             :  * if it was palloc'd.
     385             :  */
     386             : #define RELEASE_SLAB_SLOT(state, tuple) \
     387             :     do { \
     388             :         SlabSlot *buf = (SlabSlot *) tuple; \
     389             :         \
     390             :         if (IS_SLAB_SLOT((state), buf)) \
     391             :         { \
     392             :             buf->nextfree = (state)->slabFreeHead; \
     393             :             (state)->slabFreeHead = buf; \
     394             :         } else \
     395             :             pfree(buf); \
     396             :     } while(0)
     397             : 
     398             : #define REMOVEABBREV(state,stup,count)  ((*(state)->base.removeabbrev) (state, stup, count))
     399             : #define COMPARETUP(state,a,b)   ((*(state)->base.comparetup) (a, b, state))
     400             : #define WRITETUP(state,tape,stup)   ((*(state)->base.writetup) (state, tape, stup))
     401             : #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
     402             : #define FREESTATE(state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
     403             : #define LACKMEM(state)      ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
     404             : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     405             : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     406             : #define SERIAL(state)       ((state)->shared == NULL)
     407             : #define WORKER(state)       ((state)->shared && (state)->worker != -1)
     408             : #define LEADER(state)       ((state)->shared && (state)->worker == -1)
     409             : 
     410             : /*
     411             :  * NOTES about on-tape representation of tuples:
     412             :  *
     413             :  * We require the first "unsigned int" of a stored tuple to be the total size
     414             :  * on-tape of the tuple, including itself (so it is never zero; an all-zero
     415             :  * unsigned int is used to delimit runs).  The remainder of the stored tuple
     416             :  * may or may not match the in-memory representation of the tuple ---
     417             :  * any conversion needed is the job of the writetup and readtup routines.
     418             :  *
     419             :  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
     420             :  * representation of the tuple must be followed by another "unsigned int" that
     421             :  * is a copy of the length --- so the total tape space used is actually
     422             :  * sizeof(unsigned int) more than the stored length value.  This allows
     423             :  * read-backwards.  When the random access flag was not specified, the
     424             :  * write/read routines may omit the extra length word.
     425             :  *
     426             :  * writetup is expected to write both length words as well as the tuple
     427             :  * data.  When readtup is called, the tape is positioned just after the
     428             :  * front length word; readtup must read the tuple data and advance past
     429             :  * the back length word (if present).
     430             :  *
     431             :  * The write/read routines can make use of the tuple description data
     432             :  * stored in the Tuplesortstate record, if needed.  They are also expected
     433             :  * to adjust state->availMem by the amount of memory space (not tape space!)
     434             :  * released or consumed.  There is no error return from either writetup
     435             :  * or readtup; they should ereport() on failure.
     436             :  *
     437             :  *
     438             :  * NOTES about memory consumption calculations:
     439             :  *
     440             :  * We count space allocated for tuples against the workMem limit, plus
     441             :  * the space used by the variable-size memtuples array.  Fixed-size space
     442             :  * is not counted; it's small enough to not be interesting.
     443             :  *
     444             :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     445             :  * rather than the originally-requested size.  This is important since
     446             :  * palloc can add substantial overhead.  It's not a complete answer since
     447             :  * we won't count any wasted space in palloc allocation blocks, but it's
     448             :  * a lot better than what we were doing before 7.3.  As of 9.6, a
     449             :  * separate memory context is used for caller passed tuples.  Resetting
     450             :  * it at certain key increments significantly ameliorates fragmentation.
     451             :  * readtup routines use the slab allocator (they cannot use
     452             :  * the reset context because it gets deleted at the point that merging
     453             :  * begins).
     454             :  */
     455             : 
     456             : 
     457             : static void tuplesort_begin_batch(Tuplesortstate *state);
     458             : static bool consider_abort_common(Tuplesortstate *state);
     459             : static void inittapes(Tuplesortstate *state, bool mergeruns);
     460             : static void inittapestate(Tuplesortstate *state, int maxTapes);
     461             : static void selectnewtape(Tuplesortstate *state);
     462             : static void init_slab_allocator(Tuplesortstate *state, int numSlots);
     463             : static void mergeruns(Tuplesortstate *state);
     464             : static void mergeonerun(Tuplesortstate *state);
     465             : static void beginmerge(Tuplesortstate *state);
     466             : static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
     467             : static void dumptuples(Tuplesortstate *state, bool alltuples);
     468             : static void make_bounded_heap(Tuplesortstate *state);
     469             : static void sort_bounded_heap(Tuplesortstate *state);
     470             : static void tuplesort_sort_memtuples(Tuplesortstate *state);
     471             : static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
     472             : static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
     473             : static void tuplesort_heap_delete_top(Tuplesortstate *state);
     474             : static void reversedirection(Tuplesortstate *state);
     475             : static unsigned int getlen(LogicalTape *tape, bool eofOK);
     476             : static void markrunend(LogicalTape *tape);
     477             : static int  worker_get_identifier(Tuplesortstate *state);
     478             : static void worker_freeze_result_tape(Tuplesortstate *state);
     479             : static void worker_nomergeruns(Tuplesortstate *state);
     480             : static void leader_takeover_tapes(Tuplesortstate *state);
     481             : static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
     482             : static void tuplesort_free(Tuplesortstate *state);
     483             : static void tuplesort_updatemax(Tuplesortstate *state);
     484             : 
     485             : /*
     486             :  * Specialized comparators that we can inline into specialized sorts.  The goal
     487             :  * is to try to sort two tuples without having to follow the pointers to the
     488             :  * comparator or the tuple.
     489             :  *
     490             :  * XXX: For now, there is no specialization for cases where datum1 is
     491             :  * authoritative and we don't even need to fall back to a callback at all (that
     492             :  * would be true for types like int4/int8/timestamp/date, but not true for
     493             :  * abbreviations of text or multi-key sorts.  There could be!  Is it worth it?
     494             :  */
     495             : 
     496             : /* Used if first key's comparator is ssup_datum_unsigned_cmp */
     497             : static pg_attribute_always_inline int
     498    46409796 : qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     499             : {
     500             :     int         compare;
     501             : 
     502    46409796 :     compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
     503    46409796 :                                           b->datum1, b->isnull1,
     504             :                                           &state->base.sortKeys[0]);
     505    46409796 :     if (compare != 0)
     506    41876510 :         return compare;
     507             : 
     508             :     /*
     509             :      * No need to waste effort calling the tiebreak function when there are no
     510             :      * other keys to sort on.
     511             :      */
     512     4533286 :     if (state->base.onlyKey != NULL)
     513           0 :         return 0;
     514             : 
     515     4533286 :     return state->base.comparetup_tiebreak(a, b, state);
     516             : }
     517             : 
     518             : #if SIZEOF_DATUM >= 8
     519             : /* Used if first key's comparator is ssup_datum_signed_cmp */
     520             : static pg_attribute_always_inline int
     521     5627272 : qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     522             : {
     523             :     int         compare;
     524             : 
     525     5627272 :     compare = ApplySignedSortComparator(a->datum1, a->isnull1,
     526     5627272 :                                         b->datum1, b->isnull1,
     527             :                                         &state->base.sortKeys[0]);
     528             : 
     529     5627272 :     if (compare != 0)
     530     5615326 :         return compare;
     531             : 
     532             :     /*
     533             :      * No need to waste effort calling the tiebreak function when there are no
     534             :      * other keys to sort on.
     535             :      */
     536       11946 :     if (state->base.onlyKey != NULL)
     537        1102 :         return 0;
     538             : 
     539       10844 :     return state->base.comparetup_tiebreak(a, b, state);
     540             : }
     541             : #endif
     542             : 
     543             : /* Used if first key's comparator is ssup_datum_int32_cmp */
     544             : static pg_attribute_always_inline int
     545    52075236 : qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
     546             : {
     547             :     int         compare;
     548             : 
     549    52075236 :     compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
     550    52075236 :                                        b->datum1, b->isnull1,
     551             :                                        &state->base.sortKeys[0]);
     552             : 
     553    52075236 :     if (compare != 0)
     554    36763318 :         return compare;
     555             : 
     556             :     /*
     557             :      * No need to waste effort calling the tiebreak function when there are no
     558             :      * other keys to sort on.
     559             :      */
     560    15311918 :     if (state->base.onlyKey != NULL)
     561     1677194 :         return 0;
     562             : 
     563    13634724 :     return state->base.comparetup_tiebreak(a, b, state);
     564             : }
     565             : 
     566             : /*
     567             :  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
     568             :  * any variant of SortTuples, using the appropriate comparetup function.
     569             :  * qsort_ssup() is specialized for the case where the comparetup function
     570             :  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
     571             :  * and Datum sorts.  qsort_tuple_{unsigned,signed,int32} are specialized for
     572             :  * common comparison functions on pass-by-value leading datums.
     573             :  */
     574             : 
     575             : #define ST_SORT qsort_tuple_unsigned
     576             : #define ST_ELEMENT_TYPE SortTuple
     577             : #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
     578             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     579             : #define ST_CHECK_FOR_INTERRUPTS
     580             : #define ST_SCOPE static
     581             : #define ST_DEFINE
     582             : #include "lib/sort_template.h"
     583             : 
     584             : #if SIZEOF_DATUM >= 8
     585             : #define ST_SORT qsort_tuple_signed
     586             : #define ST_ELEMENT_TYPE SortTuple
     587             : #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
     588             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     589             : #define ST_CHECK_FOR_INTERRUPTS
     590             : #define ST_SCOPE static
     591             : #define ST_DEFINE
     592             : #include "lib/sort_template.h"
     593             : #endif
     594             : 
     595             : #define ST_SORT qsort_tuple_int32
     596             : #define ST_ELEMENT_TYPE SortTuple
     597             : #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
     598             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     599             : #define ST_CHECK_FOR_INTERRUPTS
     600             : #define ST_SCOPE static
     601             : #define ST_DEFINE
     602             : #include "lib/sort_template.h"
     603             : 
     604             : #define ST_SORT qsort_tuple
     605             : #define ST_ELEMENT_TYPE SortTuple
     606             : #define ST_COMPARE_RUNTIME_POINTER
     607             : #define ST_COMPARE_ARG_TYPE Tuplesortstate
     608             : #define ST_CHECK_FOR_INTERRUPTS
     609             : #define ST_SCOPE static
     610             : #define ST_DECLARE
     611             : #define ST_DEFINE
     612             : #include "lib/sort_template.h"
     613             : 
     614             : #define ST_SORT qsort_ssup
     615             : #define ST_ELEMENT_TYPE SortTuple
     616             : #define ST_COMPARE(a, b, ssup) \
     617             :     ApplySortComparator((a)->datum1, (a)->isnull1, \
     618             :                         (b)->datum1, (b)->isnull1, (ssup))
     619             : #define ST_COMPARE_ARG_TYPE SortSupportData
     620             : #define ST_CHECK_FOR_INTERRUPTS
     621             : #define ST_SCOPE static
     622             : #define ST_DEFINE
     623             : #include "lib/sort_template.h"
     624             : 
     625             : /*
     626             :  *      tuplesort_begin_xxx
     627             :  *
     628             :  * Initialize for a tuple sort operation.
     629             :  *
     630             :  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
     631             :  * zero or more times, then call tuplesort_performsort when all the tuples
     632             :  * have been supplied.  After performsort, retrieve the tuples in sorted
     633             :  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
     634             :  * access was requested, rescan, markpos, and restorepos can also be called.)
     635             :  * Call tuplesort_end to terminate the operation and release memory/disk space.
     636             :  *
     637             :  * Each variant of tuplesort_begin has a workMem parameter specifying the
     638             :  * maximum number of kilobytes of RAM to use before spilling data to disk.
     639             :  * (The normal value of this parameter is work_mem, but some callers use
     640             :  * other values.)  Each variant also has a sortopt which is a bitmask of
     641             :  * sort options.  See TUPLESORT_* definitions in tuplesort.h
     642             :  */
     643             : 
     644             : Tuplesortstate *
     645      234726 : tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
     646             : {
     647             :     Tuplesortstate *state;
     648             :     MemoryContext maincontext;
     649             :     MemoryContext sortcontext;
     650             :     MemoryContext oldcontext;
     651             : 
     652             :     /* See leader_takeover_tapes() remarks on random access support */
     653      234726 :     if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
     654           0 :         elog(ERROR, "random access disallowed under parallel sort");
     655             : 
     656             :     /*
     657             :      * Memory context surviving tuplesort_reset.  This memory context holds
     658             :      * data which is useful to keep while sorting multiple similar batches.
     659             :      */
     660      234726 :     maincontext = AllocSetContextCreate(CurrentMemoryContext,
     661             :                                         "TupleSort main",
     662             :                                         ALLOCSET_DEFAULT_SIZES);
     663             : 
     664             :     /*
     665             :      * Create a working memory context for one sort operation.  The content of
     666             :      * this context is deleted by tuplesort_reset.
     667             :      */
     668      234726 :     sortcontext = AllocSetContextCreate(maincontext,
     669             :                                         "TupleSort sort",
     670             :                                         ALLOCSET_DEFAULT_SIZES);
     671             : 
     672             :     /*
     673             :      * Additionally a working memory context for tuples is setup in
     674             :      * tuplesort_begin_batch.
     675             :      */
     676             : 
     677             :     /*
     678             :      * Make the Tuplesortstate within the per-sortstate context.  This way, we
     679             :      * don't need a separate pfree() operation for it at shutdown.
     680             :      */
     681      234726 :     oldcontext = MemoryContextSwitchTo(maincontext);
     682             : 
     683      234726 :     state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
     684             : 
     685             : #ifdef TRACE_SORT
     686      234726 :     if (trace_sort)
     687           0 :         pg_rusage_init(&state->ru_start);
     688             : #endif
     689             : 
     690      234726 :     state->base.sortopt = sortopt;
     691      234726 :     state->base.tuples = true;
     692      234726 :     state->abbrevNext = 10;
     693             : 
     694             :     /*
     695             :      * workMem is forced to be at least 64KB, the current minimum valid value
     696             :      * for the work_mem GUC.  This is a defense against parallel sort callers
     697             :      * that divide out memory among many workers in a way that leaves each
     698             :      * with very little memory.
     699             :      */
     700      234726 :     state->allowedMem = Max(workMem, 64) * (int64) 1024;
     701      234726 :     state->base.sortcontext = sortcontext;
     702      234726 :     state->base.maincontext = maincontext;
     703             : 
     704             :     /*
     705             :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     706             :      * see comments in grow_memtuples().
     707             :      */
     708      234726 :     state->memtupsize = INITIAL_MEMTUPSIZE;
     709      234726 :     state->memtuples = NULL;
     710             : 
     711             :     /*
     712             :      * After all of the other non-parallel-related state, we setup all of the
     713             :      * state needed for each batch.
     714             :      */
     715      234726 :     tuplesort_begin_batch(state);
     716             : 
     717             :     /*
     718             :      * Initialize parallel-related state based on coordination information
     719             :      * from caller
     720             :      */
     721      234726 :     if (!coordinate)
     722             :     {
     723             :         /* Serial sort */
     724      234098 :         state->shared = NULL;
     725      234098 :         state->worker = -1;
     726      234098 :         state->nParticipants = -1;
     727             :     }
     728         628 :     else if (coordinate->isWorker)
     729             :     {
     730             :         /* Parallel worker produces exactly one final run from all input */
     731         420 :         state->shared = coordinate->sharedsort;
     732         420 :         state->worker = worker_get_identifier(state);
     733         420 :         state->nParticipants = -1;
     734             :     }
     735             :     else
     736             :     {
     737             :         /* Parallel leader state only used for final merge */
     738         208 :         state->shared = coordinate->sharedsort;
     739         208 :         state->worker = -1;
     740         208 :         state->nParticipants = coordinate->nParticipants;
     741             :         Assert(state->nParticipants >= 1);
     742             :     }
     743             : 
     744      234726 :     MemoryContextSwitchTo(oldcontext);
     745             : 
     746      234726 :     return state;
     747             : }
     748             : 
     749             : /*
     750             :  *      tuplesort_begin_batch
     751             :  *
     752             :  * Setup, or reset, all state need for processing a new set of tuples with this
     753             :  * sort state. Called both from tuplesort_begin_common (the first time sorting
     754             :  * with this sort state) and tuplesort_reset (for subsequent usages).
     755             :  */
     756             : static void
     757      236894 : tuplesort_begin_batch(Tuplesortstate *state)
     758             : {
     759             :     MemoryContext oldcontext;
     760             : 
     761      236894 :     oldcontext = MemoryContextSwitchTo(state->base.maincontext);
     762             : 
     763             :     /*
     764             :      * Caller tuple (e.g. IndexTuple) memory context.
     765             :      *
     766             :      * A dedicated child context used exclusively for caller passed tuples
     767             :      * eases memory management.  Resetting at key points reduces
     768             :      * fragmentation. Note that the memtuples array of SortTuples is allocated
     769             :      * in the parent context, not this context, because there is no need to
     770             :      * free memtuples early.  For bounded sorts, tuples may be pfreed in any
     771             :      * order, so we use a regular aset.c context so that it can make use of
     772             :      * free'd memory.  When the sort is not bounded, we make use of a bump.c
     773             :      * context as this keeps allocations more compact with less wastage.
     774             :      * Allocations are also slightly more CPU efficient.
     775             :      */
     776      236894 :     if (TupleSortUseBumpTupleCxt(state->base.sortopt))
     777      235580 :         state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
     778             :                                                      "Caller tuples",
     779             :                                                      ALLOCSET_DEFAULT_SIZES);
     780             :     else
     781        1314 :         state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
     782             :                                                          "Caller tuples",
     783             :                                                          ALLOCSET_DEFAULT_SIZES);
     784             : 
     785             : 
     786      236894 :     state->status = TSS_INITIAL;
     787      236894 :     state->bounded = false;
     788      236894 :     state->boundUsed = false;
     789             : 
     790      236894 :     state->availMem = state->allowedMem;
     791             : 
     792      236894 :     state->tapeset = NULL;
     793             : 
     794      236894 :     state->memtupcount = 0;
     795             : 
     796             :     /*
     797             :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     798             :      * see comments in grow_memtuples().
     799             :      */
     800      236894 :     state->growmemtuples = true;
     801      236894 :     state->slabAllocatorUsed = false;
     802      236894 :     if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
     803             :     {
     804           0 :         pfree(state->memtuples);
     805           0 :         state->memtuples = NULL;
     806           0 :         state->memtupsize = INITIAL_MEMTUPSIZE;
     807             :     }
     808      236894 :     if (state->memtuples == NULL)
     809             :     {
     810      234726 :         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
     811      234726 :         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     812             :     }
     813             : 
     814             :     /* workMem must be large enough for the minimal memtuples array */
     815      236894 :     if (LACKMEM(state))
     816           0 :         elog(ERROR, "insufficient memory allowed for sort");
     817             : 
     818      236894 :     state->currentRun = 0;
     819             : 
     820             :     /*
     821             :      * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
     822             :      * inittapes(), if needed.
     823             :      */
     824             : 
     825      236894 :     state->result_tape = NULL;   /* flag that result tape has not been formed */
     826             : 
     827      236894 :     MemoryContextSwitchTo(oldcontext);
     828      236894 : }
     829             : 
     830             : /*
     831             :  * tuplesort_set_bound
     832             :  *
     833             :  *  Advise tuplesort that at most the first N result tuples are required.
     834             :  *
     835             :  * Must be called before inserting any tuples.  (Actually, we could allow it
     836             :  * as long as the sort hasn't spilled to disk, but there seems no need for
     837             :  * delayed calls at the moment.)
     838             :  *
     839             :  * This is a hint only. The tuplesort may still return more tuples than
     840             :  * requested.  Parallel leader tuplesorts will always ignore the hint.
     841             :  */
     842             : void
     843        1180 : tuplesort_set_bound(Tuplesortstate *state, int64 bound)
     844             : {
     845             :     /* Assert we're called before loading any tuples */
     846             :     Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
     847             :     /* Assert we allow bounded sorts */
     848             :     Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
     849             :     /* Can't set the bound twice, either */
     850             :     Assert(!state->bounded);
     851             :     /* Also, this shouldn't be called in a parallel worker */
     852             :     Assert(!WORKER(state));
     853             : 
     854             :     /* Parallel leader allows but ignores hint */
     855        1180 :     if (LEADER(state))
     856           0 :         return;
     857             : 
     858             : #ifdef DEBUG_BOUNDED_SORT
     859             :     /* Honor GUC setting that disables the feature (for easy testing) */
     860             :     if (!optimize_bounded_sort)
     861             :         return;
     862             : #endif
     863             : 
     864             :     /* We want to be able to compute bound * 2, so limit the setting */
     865        1180 :     if (bound > (int64) (INT_MAX / 2))
     866           0 :         return;
     867             : 
     868        1180 :     state->bounded = true;
     869        1180 :     state->bound = (int) bound;
     870             : 
     871             :     /*
     872             :      * Bounded sorts are not an effective target for abbreviated key
     873             :      * optimization.  Disable by setting state to be consistent with no
     874             :      * abbreviation support.
     875             :      */
     876        1180 :     state->base.sortKeys->abbrev_converter = NULL;
     877        1180 :     if (state->base.sortKeys->abbrev_full_comparator)
     878          16 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
     879             : 
     880             :     /* Not strictly necessary, but be tidy */
     881        1180 :     state->base.sortKeys->abbrev_abort = NULL;
     882        1180 :     state->base.sortKeys->abbrev_full_comparator = NULL;
     883             : }
     884             : 
     885             : /*
     886             :  * tuplesort_used_bound
     887             :  *
     888             :  * Allow callers to find out if the sort state was able to use a bound.
     889             :  */
     890             : bool
     891          94 : tuplesort_used_bound(Tuplesortstate *state)
     892             : {
     893          94 :     return state->boundUsed;
     894             : }
     895             : 
     896             : /*
     897             :  * tuplesort_free
     898             :  *
     899             :  *  Internal routine for freeing resources of tuplesort.
     900             :  */
     901             : static void
     902      236650 : tuplesort_free(Tuplesortstate *state)
     903             : {
     904             :     /* context swap probably not needed, but let's be safe */
     905      236650 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
     906             : 
     907             : #ifdef TRACE_SORT
     908             :     int64       spaceUsed;
     909             : 
     910      236650 :     if (state->tapeset)
     911         678 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset);
     912             :     else
     913      235972 :         spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
     914             : #endif
     915             : 
     916             :     /*
     917             :      * Delete temporary "tape" files, if any.
     918             :      *
     919             :      * Note: want to include this in reported total cost of sort, hence need
     920             :      * for two #ifdef TRACE_SORT sections.
     921             :      *
     922             :      * We don't bother to destroy the individual tapes here. They will go away
     923             :      * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
     924             :      * finished tapes already.)
     925             :      */
     926      236650 :     if (state->tapeset)
     927         678 :         LogicalTapeSetClose(state->tapeset);
     928             : 
     929             : #ifdef TRACE_SORT
     930      236650 :     if (trace_sort)
     931             :     {
     932           0 :         if (state->tapeset)
     933           0 :             elog(LOG, "%s of worker %d ended, %lld disk blocks used: %s",
     934             :                  SERIAL(state) ? "external sort" : "parallel external sort",
     935             :                  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
     936             :         else
     937           0 :             elog(LOG, "%s of worker %d ended, %lld KB used: %s",
     938             :                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
     939             :                  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
     940             :     }
     941             : 
     942             :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
     943             : #else
     944             : 
     945             :     /*
     946             :      * If you disabled TRACE_SORT, you can still probe sort__done, but you
     947             :      * ain't getting space-used stats.
     948             :      */
     949             :     TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
     950             : #endif
     951             : 
     952      236650 :     FREESTATE(state);
     953      236650 :     MemoryContextSwitchTo(oldcontext);
     954             : 
     955             :     /*
     956             :      * Free the per-sort memory context, thereby releasing all working memory.
     957             :      */
     958      236650 :     MemoryContextReset(state->base.sortcontext);
     959      236650 : }
     960             : 
     961             : /*
     962             :  * tuplesort_end
     963             :  *
     964             :  *  Release resources and clean up.
     965             :  *
     966             :  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
     967             :  * pointing to garbage.  Be careful not to attempt to use or free such
     968             :  * pointers afterwards!
     969             :  */
     970             : void
     971      234482 : tuplesort_end(Tuplesortstate *state)
     972             : {
     973      234482 :     tuplesort_free(state);
     974             : 
     975             :     /*
     976             :      * Free the main memory context, including the Tuplesortstate struct
     977             :      * itself.
     978             :      */
     979      234482 :     MemoryContextDelete(state->base.maincontext);
     980      234482 : }
     981             : 
     982             : /*
     983             :  * tuplesort_updatemax
     984             :  *
     985             :  *  Update maximum resource usage statistics.
     986             :  */
     987             : static void
     988        2552 : tuplesort_updatemax(Tuplesortstate *state)
     989             : {
     990             :     int64       spaceUsed;
     991             :     bool        isSpaceDisk;
     992             : 
     993             :     /*
     994             :      * Note: it might seem we should provide both memory and disk usage for a
     995             :      * disk-based sort.  However, the current code doesn't track memory space
     996             :      * accurately once we have begun to return tuples to the caller (since we
     997             :      * don't account for pfree's the caller is expected to do), so we cannot
     998             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
     999             :      * to fix.  Is it worth creating an API for the memory context code to
    1000             :      * tell us how much is actually used in sortcontext?
    1001             :      */
    1002        2552 :     if (state->tapeset)
    1003             :     {
    1004           0 :         isSpaceDisk = true;
    1005           0 :         spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
    1006             :     }
    1007             :     else
    1008             :     {
    1009        2552 :         isSpaceDisk = false;
    1010        2552 :         spaceUsed = state->allowedMem - state->availMem;
    1011             :     }
    1012             : 
    1013             :     /*
    1014             :      * Sort evicts data to the disk when it wasn't able to fit that data into
    1015             :      * main memory.  This is why we assume space used on the disk to be more
    1016             :      * important for tracking resource usage than space used in memory. Note
    1017             :      * that the amount of space occupied by some tupleset on the disk might be
    1018             :      * less than amount of space occupied by the same tupleset in memory due
    1019             :      * to more compact representation.
    1020             :      */
    1021        2552 :     if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
    1022        2552 :         (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
    1023             :     {
    1024         382 :         state->maxSpace = spaceUsed;
    1025         382 :         state->isMaxSpaceDisk = isSpaceDisk;
    1026         382 :         state->maxSpaceStatus = state->status;
    1027             :     }
    1028        2552 : }
    1029             : 
    1030             : /*
    1031             :  * tuplesort_reset
    1032             :  *
    1033             :  *  Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
    1034             :  *  meta-information in.  After tuplesort_reset, tuplesort is ready to start
    1035             :  *  a new sort.  This allows avoiding recreation of tuple sort states (and
    1036             :  *  save resources) when sorting multiple small batches.
    1037             :  */
    1038             : void
    1039        2168 : tuplesort_reset(Tuplesortstate *state)
    1040             : {
    1041        2168 :     tuplesort_updatemax(state);
    1042        2168 :     tuplesort_free(state);
    1043             : 
    1044             :     /*
    1045             :      * After we've freed up per-batch memory, re-setup all of the state common
    1046             :      * to both the first batch and any subsequent batch.
    1047             :      */
    1048        2168 :     tuplesort_begin_batch(state);
    1049             : 
    1050        2168 :     state->lastReturnedTuple = NULL;
    1051        2168 :     state->slabMemoryBegin = NULL;
    1052        2168 :     state->slabMemoryEnd = NULL;
    1053        2168 :     state->slabFreeHead = NULL;
    1054        2168 : }
    1055             : 
    1056             : /*
    1057             :  * Grow the memtuples[] array, if possible within our memory constraint.  We
    1058             :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
    1059             :  * limit.  Return true if we were able to enlarge the array, false if not.
    1060             :  *
    1061             :  * Normally, at each increment we double the size of the array.  When doing
    1062             :  * that would exceed a limit, we attempt one last, smaller increase (and then
    1063             :  * clear the growmemtuples flag so we don't try any more).  That allows us to
    1064             :  * use memory as fully as permitted; sticking to the pure doubling rule could
    1065             :  * result in almost half going unused.  Because availMem moves around with
    1066             :  * tuple addition/removal, we need some rule to prevent making repeated small
    1067             :  * increases in memtupsize, which would just be useless thrashing.  The
    1068             :  * growmemtuples flag accomplishes that and also prevents useless
    1069             :  * recalculations in this function.
    1070             :  */
    1071             : static bool
    1072        6942 : grow_memtuples(Tuplesortstate *state)
    1073             : {
    1074             :     int         newmemtupsize;
    1075        6942 :     int         memtupsize = state->memtupsize;
    1076        6942 :     int64       memNowUsed = state->allowedMem - state->availMem;
    1077             : 
    1078             :     /* Forget it if we've already maxed out memtuples, per comment above */
    1079        6942 :     if (!state->growmemtuples)
    1080         114 :         return false;
    1081             : 
    1082             :     /* Select new value of memtupsize */
    1083        6828 :     if (memNowUsed <= state->availMem)
    1084             :     {
    1085             :         /*
    1086             :          * We've used no more than half of allowedMem; double our usage,
    1087             :          * clamping at INT_MAX tuples.
    1088             :          */
    1089        6706 :         if (memtupsize < INT_MAX / 2)
    1090        6706 :             newmemtupsize = memtupsize * 2;
    1091             :         else
    1092             :         {
    1093           0 :             newmemtupsize = INT_MAX;
    1094           0 :             state->growmemtuples = false;
    1095             :         }
    1096             :     }
    1097             :     else
    1098             :     {
    1099             :         /*
    1100             :          * This will be the last increment of memtupsize.  Abandon doubling
    1101             :          * strategy and instead increase as much as we safely can.
    1102             :          *
    1103             :          * To stay within allowedMem, we can't increase memtupsize by more
    1104             :          * than availMem / sizeof(SortTuple) elements.  In practice, we want
    1105             :          * to increase it by considerably less, because we need to leave some
    1106             :          * space for the tuples to which the new array slots will refer.  We
    1107             :          * assume the new tuples will be about the same size as the tuples
    1108             :          * we've already seen, and thus we can extrapolate from the space
    1109             :          * consumption so far to estimate an appropriate new size for the
    1110             :          * memtuples array.  The optimal value might be higher or lower than
    1111             :          * this estimate, but it's hard to know that in advance.  We again
    1112             :          * clamp at INT_MAX tuples.
    1113             :          *
    1114             :          * This calculation is safe against enlarging the array so much that
    1115             :          * LACKMEM becomes true, because the memory currently used includes
    1116             :          * the present array; thus, there would be enough allowedMem for the
    1117             :          * new array elements even if no other memory were currently used.
    1118             :          *
    1119             :          * We do the arithmetic in float8, because otherwise the product of
    1120             :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
    1121             :          * result should be insignificant; but even if we computed a
    1122             :          * completely insane result, the checks below will prevent anything
    1123             :          * really bad from happening.
    1124             :          */
    1125             :         double      grow_ratio;
    1126             : 
    1127         122 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
    1128         122 :         if (memtupsize * grow_ratio < INT_MAX)
    1129         122 :             newmemtupsize = (int) (memtupsize * grow_ratio);
    1130             :         else
    1131           0 :             newmemtupsize = INT_MAX;
    1132             : 
    1133             :         /* We won't make any further enlargement attempts */
    1134         122 :         state->growmemtuples = false;
    1135             :     }
    1136             : 
    1137             :     /* Must enlarge array by at least one element, else report failure */
    1138        6828 :     if (newmemtupsize <= memtupsize)
    1139           0 :         goto noalloc;
    1140             : 
    1141             :     /*
    1142             :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
    1143             :      * to ensure our request won't be rejected.  Note that we can easily
    1144             :      * exhaust address space before facing this outcome.  (This is presently
    1145             :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
    1146             :      * don't rely on that at this distance.)
    1147             :      */
    1148        6828 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
    1149             :     {
    1150           0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
    1151           0 :         state->growmemtuples = false;    /* can't grow any more */
    1152             :     }
    1153             : 
    1154             :     /*
    1155             :      * We need to be sure that we do not cause LACKMEM to become true, else
    1156             :      * the space management algorithm will go nuts.  The code above should
    1157             :      * never generate a dangerous request, but to be safe, check explicitly
    1158             :      * that the array growth fits within availMem.  (We could still cause
    1159             :      * LACKMEM if the memory chunk overhead associated with the memtuples
    1160             :      * array were to increase.  That shouldn't happen because we chose the
    1161             :      * initial array size large enough to ensure that palloc will be treating
    1162             :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
    1163             :      * explicitly below just in case.)
    1164             :      */
    1165        6828 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
    1166           0 :         goto noalloc;
    1167             : 
    1168             :     /* OK, do it */
    1169        6828 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1170        6828 :     state->memtupsize = newmemtupsize;
    1171        6828 :     state->memtuples = (SortTuple *)
    1172        6828 :         repalloc_huge(state->memtuples,
    1173        6828 :                       state->memtupsize * sizeof(SortTuple));
    1174        6828 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    1175        6828 :     if (LACKMEM(state))
    1176           0 :         elog(ERROR, "unexpected out-of-memory situation in tuplesort");
    1177        6828 :     return true;
    1178             : 
    1179           0 : noalloc:
    1180             :     /* If for any reason we didn't realloc, shut off future attempts */
    1181           0 :     state->growmemtuples = false;
    1182           0 :     return false;
    1183             : }
    1184             : 
    1185             : /*
    1186             :  * Shared code for tuple and datum cases.
    1187             :  */
    1188             : void
    1189    27816690 : tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple,
    1190             :                           bool useAbbrev, Size tuplen)
    1191             : {
    1192    27816690 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1193             : 
    1194             :     Assert(!LEADER(state));
    1195             : 
    1196             :     /* account for the memory used for this tuple */
    1197    27816690 :     USEMEM(state, tuplen);
    1198    27816690 :     state->tupleMem += tuplen;
    1199             : 
    1200    27816690 :     if (!useAbbrev)
    1201             :     {
    1202             :         /*
    1203             :          * Leave ordinary Datum representation, or NULL value.  If there is a
    1204             :          * converter it won't expect NULL values, and cost model is not
    1205             :          * required to account for NULL, so in that case we avoid calling
    1206             :          * converter and just set datum1 to zeroed representation (to be
    1207             :          * consistent, and to support cheap inequality tests for NULL
    1208             :          * abbreviated keys).
    1209             :          */
    1210             :     }
    1211     4419692 :     else if (!consider_abort_common(state))
    1212             :     {
    1213             :         /* Store abbreviated key representation */
    1214     4419596 :         tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
    1215             :                                                                state->base.sortKeys);
    1216             :     }
    1217             :     else
    1218             :     {
    1219             :         /*
    1220             :          * Set state to be consistent with never trying abbreviation.
    1221             :          *
    1222             :          * Alter datum1 representation in already-copied tuples, so as to
    1223             :          * ensure a consistent representation (current tuple was just
    1224             :          * handled).  It does not matter if some dumped tuples are already
    1225             :          * sorted on tape, since serialized tuples lack abbreviated keys
    1226             :          * (TSS_BUILDRUNS state prevents control reaching here in any case).
    1227             :          */
    1228          96 :         REMOVEABBREV(state, state->memtuples, state->memtupcount);
    1229             :     }
    1230             : 
    1231    27816690 :     switch (state->status)
    1232             :     {
    1233    23021892 :         case TSS_INITIAL:
    1234             : 
    1235             :             /*
    1236             :              * Save the tuple into the unsorted array.  First, grow the array
    1237             :              * as needed.  Note that we try to grow the array when there is
    1238             :              * still one free slot remaining --- if we fail, there'll still be
    1239             :              * room to store the incoming tuple, and then we'll switch to
    1240             :              * tape-based operation.
    1241             :              */
    1242    23021892 :             if (state->memtupcount >= state->memtupsize - 1)
    1243             :             {
    1244        6942 :                 (void) grow_memtuples(state);
    1245             :                 Assert(state->memtupcount < state->memtupsize);
    1246             :             }
    1247    23021892 :             state->memtuples[state->memtupcount++] = *tuple;
    1248             : 
    1249             :             /*
    1250             :              * Check if it's time to switch over to a bounded heapsort. We do
    1251             :              * so if the input tuple count exceeds twice the desired tuple
    1252             :              * count (this is a heuristic for where heapsort becomes cheaper
    1253             :              * than a quicksort), or if we've just filled workMem and have
    1254             :              * enough tuples to meet the bound.
    1255             :              *
    1256             :              * Note that once we enter TSS_BOUNDED state we will always try to
    1257             :              * complete the sort that way.  In the worst case, if later input
    1258             :              * tuples are larger than earlier ones, this might cause us to
    1259             :              * exceed workMem significantly.
    1260             :              */
    1261    23021892 :             if (state->bounded &&
    1262       43182 :                 (state->memtupcount > state->bound * 2 ||
    1263       42784 :                  (state->memtupcount > state->bound && LACKMEM(state))))
    1264             :             {
    1265             : #ifdef TRACE_SORT
    1266         398 :                 if (trace_sort)
    1267           0 :                     elog(LOG, "switching to bounded heapsort at %d tuples: %s",
    1268             :                          state->memtupcount,
    1269             :                          pg_rusage_show(&state->ru_start));
    1270             : #endif
    1271         398 :                 make_bounded_heap(state);
    1272         398 :                 MemoryContextSwitchTo(oldcontext);
    1273         398 :                 return;
    1274             :             }
    1275             : 
    1276             :             /*
    1277             :              * Done if we still fit in available memory and have array slots.
    1278             :              */
    1279    23021494 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
    1280             :             {
    1281    23021380 :                 MemoryContextSwitchTo(oldcontext);
    1282    23021380 :                 return;
    1283             :             }
    1284             : 
    1285             :             /*
    1286             :              * Nope; time to switch to tape-based operation.
    1287             :              */
    1288         114 :             inittapes(state, true);
    1289             : 
    1290             :             /*
    1291             :              * Dump all tuples.
    1292             :              */
    1293         114 :             dumptuples(state, false);
    1294         114 :             break;
    1295             : 
    1296     3734028 :         case TSS_BOUNDED:
    1297             : 
    1298             :             /*
    1299             :              * We don't want to grow the array here, so check whether the new
    1300             :              * tuple can be discarded before putting it in.  This should be a
    1301             :              * good speed optimization, too, since when there are many more
    1302             :              * input tuples than the bound, most input tuples can be discarded
    1303             :              * with just this one comparison.  Note that because we currently
    1304             :              * have the sort direction reversed, we must check for <= not >=.
    1305             :              */
    1306     3734028 :             if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
    1307             :             {
    1308             :                 /* new tuple <= top of the heap, so we can discard it */
    1309     3231126 :                 free_sort_tuple(state, tuple);
    1310     3231126 :                 CHECK_FOR_INTERRUPTS();
    1311             :             }
    1312             :             else
    1313             :             {
    1314             :                 /* discard top of heap, replacing it with the new tuple */
    1315      502902 :                 free_sort_tuple(state, &state->memtuples[0]);
    1316      502902 :                 tuplesort_heap_replace_top(state, tuple);
    1317             :             }
    1318     3734028 :             break;
    1319             : 
    1320     1060770 :         case TSS_BUILDRUNS:
    1321             : 
    1322             :             /*
    1323             :              * Save the tuple into the unsorted array (there must be space)
    1324             :              */
    1325     1060770 :             state->memtuples[state->memtupcount++] = *tuple;
    1326             : 
    1327             :             /*
    1328             :              * If we are over the memory limit, dump all tuples.
    1329             :              */
    1330     1060770 :             dumptuples(state, false);
    1331     1060770 :             break;
    1332             : 
    1333           0 :         default:
    1334           0 :             elog(ERROR, "invalid tuplesort state");
    1335             :             break;
    1336             :     }
    1337     4794912 :     MemoryContextSwitchTo(oldcontext);
    1338             : }
    1339             : 
    1340             : static bool
    1341     4419692 : consider_abort_common(Tuplesortstate *state)
    1342             : {
    1343             :     Assert(state->base.sortKeys[0].abbrev_converter != NULL);
    1344             :     Assert(state->base.sortKeys[0].abbrev_abort != NULL);
    1345             :     Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
    1346             : 
    1347             :     /*
    1348             :      * Check effectiveness of abbreviation optimization.  Consider aborting
    1349             :      * when still within memory limit.
    1350             :      */
    1351     4419692 :     if (state->status == TSS_INITIAL &&
    1352     3947262 :         state->memtupcount >= state->abbrevNext)
    1353             :     {
    1354        4904 :         state->abbrevNext *= 2;
    1355             : 
    1356             :         /*
    1357             :          * Check opclass-supplied abbreviation abort routine.  It may indicate
    1358             :          * that abbreviation should not proceed.
    1359             :          */
    1360        4904 :         if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
    1361             :                                                 state->base.sortKeys))
    1362        4808 :             return false;
    1363             : 
    1364             :         /*
    1365             :          * Finally, restore authoritative comparator, and indicate that
    1366             :          * abbreviation is not in play by setting abbrev_converter to NULL
    1367             :          */
    1368          96 :         state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
    1369          96 :         state->base.sortKeys[0].abbrev_converter = NULL;
    1370             :         /* Not strictly necessary, but be tidy */
    1371          96 :         state->base.sortKeys[0].abbrev_abort = NULL;
    1372          96 :         state->base.sortKeys[0].abbrev_full_comparator = NULL;
    1373             : 
    1374             :         /* Give up - expect original pass-by-value representation */
    1375          96 :         return true;
    1376             :     }
    1377             : 
    1378     4414788 :     return false;
    1379             : }
    1380             : 
    1381             : /*
    1382             :  * All tuples have been provided; finish the sort.
    1383             :  */
    1384             : void
    1385      199386 : tuplesort_performsort(Tuplesortstate *state)
    1386             : {
    1387      199386 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1388             : 
    1389             : #ifdef TRACE_SORT
    1390      199386 :     if (trace_sort)
    1391           0 :         elog(LOG, "performsort of worker %d starting: %s",
    1392             :              state->worker, pg_rusage_show(&state->ru_start));
    1393             : #endif
    1394             : 
    1395      199386 :     switch (state->status)
    1396             :     {
    1397      198874 :         case TSS_INITIAL:
    1398             : 
    1399             :             /*
    1400             :              * We were able to accumulate all the tuples within the allowed
    1401             :              * amount of memory, or leader to take over worker tapes
    1402             :              */
    1403      198874 :             if (SERIAL(state))
    1404             :             {
    1405             :                 /* Just qsort 'em and we're done */
    1406      198310 :                 tuplesort_sort_memtuples(state);
    1407      198226 :                 state->status = TSS_SORTEDINMEM;
    1408             :             }
    1409         564 :             else if (WORKER(state))
    1410             :             {
    1411             :                 /*
    1412             :                  * Parallel workers must still dump out tuples to tape.  No
    1413             :                  * merge is required to produce single output run, though.
    1414             :                  */
    1415         420 :                 inittapes(state, false);
    1416         420 :                 dumptuples(state, true);
    1417         420 :                 worker_nomergeruns(state);
    1418         420 :                 state->status = TSS_SORTEDONTAPE;
    1419             :             }
    1420             :             else
    1421             :             {
    1422             :                 /*
    1423             :                  * Leader will take over worker tapes and merge worker runs.
    1424             :                  * Note that mergeruns sets the correct state->status.
    1425             :                  */
    1426         144 :                 leader_takeover_tapes(state);
    1427         144 :                 mergeruns(state);
    1428             :             }
    1429      198790 :             state->current = 0;
    1430      198790 :             state->eof_reached = false;
    1431      198790 :             state->markpos_block = 0L;
    1432      198790 :             state->markpos_offset = 0;
    1433      198790 :             state->markpos_eof = false;
    1434      198790 :             break;
    1435             : 
    1436         398 :         case TSS_BOUNDED:
    1437             : 
    1438             :             /*
    1439             :              * We were able to accumulate all the tuples required for output
    1440             :              * in memory, using a heap to eliminate excess tuples.  Now we
    1441             :              * have to transform the heap to a properly-sorted array. Note
    1442             :              * that sort_bounded_heap sets the correct state->status.
    1443             :              */
    1444         398 :             sort_bounded_heap(state);
    1445         398 :             state->current = 0;
    1446         398 :             state->eof_reached = false;
    1447         398 :             state->markpos_offset = 0;
    1448         398 :             state->markpos_eof = false;
    1449         398 :             break;
    1450             : 
    1451         114 :         case TSS_BUILDRUNS:
    1452             : 
    1453             :             /*
    1454             :              * Finish tape-based sort.  First, flush all tuples remaining in
    1455             :              * memory out to tape; then merge until we have a single remaining
    1456             :              * run (or, if !randomAccess and !WORKER(), one run per tape).
    1457             :              * Note that mergeruns sets the correct state->status.
    1458             :              */
    1459         114 :             dumptuples(state, true);
    1460         114 :             mergeruns(state);
    1461         114 :             state->eof_reached = false;
    1462         114 :             state->markpos_block = 0L;
    1463         114 :             state->markpos_offset = 0;
    1464         114 :             state->markpos_eof = false;
    1465         114 :             break;
    1466             : 
    1467           0 :         default:
    1468           0 :             elog(ERROR, "invalid tuplesort state");
    1469             :             break;
    1470             :     }
    1471             : 
    1472             : #ifdef TRACE_SORT
    1473      199302 :     if (trace_sort)
    1474             :     {
    1475           0 :         if (state->status == TSS_FINALMERGE)
    1476           0 :             elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
    1477             :                  state->worker, state->nInputTapes,
    1478             :                  pg_rusage_show(&state->ru_start));
    1479             :         else
    1480           0 :             elog(LOG, "performsort of worker %d done: %s",
    1481             :                  state->worker, pg_rusage_show(&state->ru_start));
    1482             :     }
    1483             : #endif
    1484             : 
    1485      199302 :     MemoryContextSwitchTo(oldcontext);
    1486      199302 : }
    1487             : 
    1488             : /*
    1489             :  * Internal routine to fetch the next tuple in either forward or back
    1490             :  * direction into *stup.  Returns false if no more tuples.
    1491             :  * Returned tuple belongs to tuplesort memory context, and must not be freed
    1492             :  * by caller.  Note that fetched tuple is stored in memory that may be
    1493             :  * recycled by any future fetch.
    1494             :  */
    1495             : bool
    1496    24974118 : tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    1497             :                           SortTuple *stup)
    1498             : {
    1499             :     unsigned int tuplen;
    1500             :     size_t      nmoved;
    1501             : 
    1502             :     Assert(!WORKER(state));
    1503             : 
    1504    24974118 :     switch (state->status)
    1505             :     {
    1506    20145572 :         case TSS_SORTEDINMEM:
    1507             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1508             :             Assert(!state->slabAllocatorUsed);
    1509    20145572 :             if (forward)
    1510             :             {
    1511    20145506 :                 if (state->current < state->memtupcount)
    1512             :                 {
    1513    19948554 :                     *stup = state->memtuples[state->current++];
    1514    19948554 :                     return true;
    1515             :                 }
    1516      196952 :                 state->eof_reached = true;
    1517             : 
    1518             :                 /*
    1519             :                  * Complain if caller tries to retrieve more tuples than
    1520             :                  * originally asked for in a bounded sort.  This is because
    1521             :                  * returning EOF here might be the wrong thing.
    1522             :                  */
    1523      196952 :                 if (state->bounded && state->current >= state->bound)
    1524           0 :                     elog(ERROR, "retrieved too many tuples in a bounded sort");
    1525             : 
    1526      196952 :                 return false;
    1527             :             }
    1528             :             else
    1529             :             {
    1530          66 :                 if (state->current <= 0)
    1531           0 :                     return false;
    1532             : 
    1533             :                 /*
    1534             :                  * if all tuples are fetched already then we return last
    1535             :                  * tuple, else - tuple before last returned.
    1536             :                  */
    1537          66 :                 if (state->eof_reached)
    1538          12 :                     state->eof_reached = false;
    1539             :                 else
    1540             :                 {
    1541          54 :                     state->current--;    /* last returned tuple */
    1542          54 :                     if (state->current <= 0)
    1543           6 :                         return false;
    1544             :                 }
    1545          60 :                 *stup = state->memtuples[state->current - 1];
    1546          60 :                 return true;
    1547             :             }
    1548             :             break;
    1549             : 
    1550      272994 :         case TSS_SORTEDONTAPE:
    1551             :             Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
    1552             :             Assert(state->slabAllocatorUsed);
    1553             : 
    1554             :             /*
    1555             :              * The slot that held the tuple that we returned in previous
    1556             :              * gettuple call can now be reused.
    1557             :              */
    1558      272994 :             if (state->lastReturnedTuple)
    1559             :             {
    1560      152850 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1561      152850 :                 state->lastReturnedTuple = NULL;
    1562             :             }
    1563             : 
    1564      272994 :             if (forward)
    1565             :             {
    1566      272964 :                 if (state->eof_reached)
    1567           0 :                     return false;
    1568             : 
    1569      272964 :                 if ((tuplen = getlen(state->result_tape, true)) != 0)
    1570             :                 {
    1571      272940 :                     READTUP(state, stup, state->result_tape, tuplen);
    1572             : 
    1573             :                     /*
    1574             :                      * Remember the tuple we return, so that we can recycle
    1575             :                      * its memory on next call.  (This can be NULL, in the
    1576             :                      * !state->tuples case).
    1577             :                      */
    1578      272940 :                     state->lastReturnedTuple = stup->tuple;
    1579             : 
    1580      272940 :                     return true;
    1581             :                 }
    1582             :                 else
    1583             :                 {
    1584          24 :                     state->eof_reached = true;
    1585          24 :                     return false;
    1586             :                 }
    1587             :             }
    1588             : 
    1589             :             /*
    1590             :              * Backward.
    1591             :              *
    1592             :              * if all tuples are fetched already then we return last tuple,
    1593             :              * else - tuple before last returned.
    1594             :              */
    1595          30 :             if (state->eof_reached)
    1596             :             {
    1597             :                 /*
    1598             :                  * Seek position is pointing just past the zero tuplen at the
    1599             :                  * end of file; back up to fetch last tuple's ending length
    1600             :                  * word.  If seek fails we must have a completely empty file.
    1601             :                  */
    1602          12 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1603             :                                               2 * sizeof(unsigned int));
    1604          12 :                 if (nmoved == 0)
    1605           0 :                     return false;
    1606          12 :                 else if (nmoved != 2 * sizeof(unsigned int))
    1607           0 :                     elog(ERROR, "unexpected tape position");
    1608          12 :                 state->eof_reached = false;
    1609             :             }
    1610             :             else
    1611             :             {
    1612             :                 /*
    1613             :                  * Back up and fetch previously-returned tuple's ending length
    1614             :                  * word.  If seek fails, assume we are at start of file.
    1615             :                  */
    1616          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1617             :                                               sizeof(unsigned int));
    1618          18 :                 if (nmoved == 0)
    1619           0 :                     return false;
    1620          18 :                 else if (nmoved != sizeof(unsigned int))
    1621           0 :                     elog(ERROR, "unexpected tape position");
    1622          18 :                 tuplen = getlen(state->result_tape, false);
    1623             : 
    1624             :                 /*
    1625             :                  * Back up to get ending length word of tuple before it.
    1626             :                  */
    1627          18 :                 nmoved = LogicalTapeBackspace(state->result_tape,
    1628             :                                               tuplen + 2 * sizeof(unsigned int));
    1629          18 :                 if (nmoved == tuplen + sizeof(unsigned int))
    1630             :                 {
    1631             :                     /*
    1632             :                      * We backed up over the previous tuple, but there was no
    1633             :                      * ending length word before it.  That means that the prev
    1634             :                      * tuple is the first tuple in the file.  It is now the
    1635             :                      * next to read in forward direction (not obviously right,
    1636             :                      * but that is what in-memory case does).
    1637             :                      */
    1638           6 :                     return false;
    1639             :                 }
    1640          12 :                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
    1641           0 :                     elog(ERROR, "bogus tuple length in backward scan");
    1642             :             }
    1643             : 
    1644          24 :             tuplen = getlen(state->result_tape, false);
    1645             : 
    1646             :             /*
    1647             :              * Now we have the length of the prior tuple, back up and read it.
    1648             :              * Note: READTUP expects we are positioned after the initial
    1649             :              * length word of the tuple, so back up to that point.
    1650             :              */
    1651          24 :             nmoved = LogicalTapeBackspace(state->result_tape,
    1652             :                                           tuplen);
    1653          24 :             if (nmoved != tuplen)
    1654           0 :                 elog(ERROR, "bogus tuple length in backward scan");
    1655          24 :             READTUP(state, stup, state->result_tape, tuplen);
    1656             : 
    1657             :             /*
    1658             :              * Remember the tuple we return, so that we can recycle its memory
    1659             :              * on next call. (This can be NULL, in the Datum case).
    1660             :              */
    1661          24 :             state->lastReturnedTuple = stup->tuple;
    1662             : 
    1663          24 :             return true;
    1664             : 
    1665     4555552 :         case TSS_FINALMERGE:
    1666             :             Assert(forward);
    1667             :             /* We are managing memory ourselves, with the slab allocator. */
    1668             :             Assert(state->slabAllocatorUsed);
    1669             : 
    1670             :             /*
    1671             :              * The slab slot holding the tuple that we returned in previous
    1672             :              * gettuple call can now be reused.
    1673             :              */
    1674     4555552 :             if (state->lastReturnedTuple)
    1675             :             {
    1676     4495282 :                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
    1677     4495282 :                 state->lastReturnedTuple = NULL;
    1678             :             }
    1679             : 
    1680             :             /*
    1681             :              * This code should match the inner loop of mergeonerun().
    1682             :              */
    1683     4555552 :             if (state->memtupcount > 0)
    1684             :             {
    1685     4555330 :                 int         srcTapeIndex = state->memtuples[0].srctape;
    1686     4555330 :                 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
    1687             :                 SortTuple   newtup;
    1688             : 
    1689     4555330 :                 *stup = state->memtuples[0];
    1690             : 
    1691             :                 /*
    1692             :                  * Remember the tuple we return, so that we can recycle its
    1693             :                  * memory on next call. (This can be NULL, in the Datum case).
    1694             :                  */
    1695     4555330 :                 state->lastReturnedTuple = stup->tuple;
    1696             : 
    1697             :                 /*
    1698             :                  * Pull next tuple from tape, and replace the returned tuple
    1699             :                  * at top of the heap with it.
    1700             :                  */
    1701     4555330 :                 if (!mergereadnext(state, srcTape, &newtup))
    1702             :                 {
    1703             :                     /*
    1704             :                      * If no more data, we've reached end of run on this tape.
    1705             :                      * Remove the top node from the heap.
    1706             :                      */
    1707         334 :                     tuplesort_heap_delete_top(state);
    1708         334 :                     state->nInputRuns--;
    1709             : 
    1710             :                     /*
    1711             :                      * Close the tape.  It'd go away at the end of the sort
    1712             :                      * anyway, but better to release the memory early.
    1713             :                      */
    1714         334 :                     LogicalTapeClose(srcTape);
    1715         334 :                     return true;
    1716             :                 }
    1717     4554996 :                 newtup.srctape = srcTapeIndex;
    1718     4554996 :                 tuplesort_heap_replace_top(state, &newtup);
    1719     4554996 :                 return true;
    1720             :             }
    1721         222 :             return false;
    1722             : 
    1723           0 :         default:
    1724           0 :             elog(ERROR, "invalid tuplesort state");
    1725             :             return false;       /* keep compiler quiet */
    1726             :     }
    1727             : }
    1728             : 
    1729             : 
    1730             : /*
    1731             :  * Advance over N tuples in either forward or back direction,
    1732             :  * without returning any data.  N==0 is a no-op.
    1733             :  * Returns true if successful, false if ran out of tuples.
    1734             :  */
    1735             : bool
    1736         392 : tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
    1737             : {
    1738             :     MemoryContext oldcontext;
    1739             : 
    1740             :     /*
    1741             :      * We don't actually support backwards skip yet, because no callers need
    1742             :      * it.  The API is designed to allow for that later, though.
    1743             :      */
    1744             :     Assert(forward);
    1745             :     Assert(ntuples >= 0);
    1746             :     Assert(!WORKER(state));
    1747             : 
    1748         392 :     switch (state->status)
    1749             :     {
    1750         368 :         case TSS_SORTEDINMEM:
    1751         368 :             if (state->memtupcount - state->current >= ntuples)
    1752             :             {
    1753         368 :                 state->current += ntuples;
    1754         368 :                 return true;
    1755             :             }
    1756           0 :             state->current = state->memtupcount;
    1757           0 :             state->eof_reached = true;
    1758             : 
    1759             :             /*
    1760             :              * Complain if caller tries to retrieve more tuples than
    1761             :              * originally asked for in a bounded sort.  This is because
    1762             :              * returning EOF here might be the wrong thing.
    1763             :              */
    1764           0 :             if (state->bounded && state->current >= state->bound)
    1765           0 :                 elog(ERROR, "retrieved too many tuples in a bounded sort");
    1766             : 
    1767           0 :             return false;
    1768             : 
    1769          24 :         case TSS_SORTEDONTAPE:
    1770             :         case TSS_FINALMERGE:
    1771             : 
    1772             :             /*
    1773             :              * We could probably optimize these cases better, but for now it's
    1774             :              * not worth the trouble.
    1775             :              */
    1776          24 :             oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    1777      240132 :             while (ntuples-- > 0)
    1778             :             {
    1779             :                 SortTuple   stup;
    1780             : 
    1781      240108 :                 if (!tuplesort_gettuple_common(state, forward, &stup))
    1782             :                 {
    1783           0 :                     MemoryContextSwitchTo(oldcontext);
    1784           0 :                     return false;
    1785             :                 }
    1786      240108 :                 CHECK_FOR_INTERRUPTS();
    1787             :             }
    1788          24 :             MemoryContextSwitchTo(oldcontext);
    1789          24 :             return true;
    1790             : 
    1791           0 :         default:
    1792           0 :             elog(ERROR, "invalid tuplesort state");
    1793             :             return false;       /* keep compiler quiet */
    1794             :     }
    1795             : }
    1796             : 
    1797             : /*
    1798             :  * tuplesort_merge_order - report merge order we'll use for given memory
    1799             :  * (note: "merge order" just means the number of input tapes in the merge).
    1800             :  *
    1801             :  * This is exported for use by the planner.  allowedMem is in bytes.
    1802             :  */
    1803             : int
    1804       16848 : tuplesort_merge_order(int64 allowedMem)
    1805             : {
    1806             :     int         mOrder;
    1807             : 
    1808             :     /*----------
    1809             :      * In the merge phase, we need buffer space for each input and output tape.
    1810             :      * Each pass in the balanced merge algorithm reads from M input tapes, and
    1811             :      * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
    1812             :      * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
    1813             :      * input tape.
    1814             :      *
    1815             :      * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
    1816             :      *            N * TAPE_BUFFER_OVERHEAD
    1817             :      *
    1818             :      * Except for the last and next-to-last merge passes, where there can be
    1819             :      * fewer tapes left to process, M = N.  We choose M so that we have the
    1820             :      * desired amount of memory available for the input buffers
    1821             :      * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
    1822             :      * available for the tape buffers (allowedMem).
    1823             :      *
    1824             :      * Note: you might be thinking we need to account for the memtuples[]
    1825             :      * array in this calculation, but we effectively treat that as part of the
    1826             :      * MERGE_BUFFER_SIZE workspace.
    1827             :      *----------
    1828             :      */
    1829       16848 :     mOrder = allowedMem /
    1830             :         (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
    1831             : 
    1832             :     /*
    1833             :      * Even in minimum memory, use at least a MINORDER merge.  On the other
    1834             :      * hand, even when we have lots of memory, do not use more than a MAXORDER
    1835             :      * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
    1836             :      * additional tape reduces the amount of memory available to build runs,
    1837             :      * which in turn can cause the same sort to need more runs, which makes
    1838             :      * merging slower even if it can still be done in a single pass.  Also,
    1839             :      * high order merges are quite slow due to CPU cache effects; it can be
    1840             :      * faster to pay the I/O cost of a multi-pass merge than to perform a
    1841             :      * single merge pass across many hundreds of tapes.
    1842             :      */
    1843       16848 :     mOrder = Max(mOrder, MINORDER);
    1844       16848 :     mOrder = Min(mOrder, MAXORDER);
    1845             : 
    1846       16848 :     return mOrder;
    1847             : }
    1848             : 
    1849             : /*
    1850             :  * Helper function to calculate how much memory to allocate for the read buffer
    1851             :  * of each input tape in a merge pass.
    1852             :  *
    1853             :  * 'avail_mem' is the amount of memory available for the buffers of all the
    1854             :  *      tapes, both input and output.
    1855             :  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
    1856             :  * 'maxOutputTapes' is the max. number of output tapes we should produce.
    1857             :  */
    1858             : static int64
    1859         288 : merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
    1860             :                        int maxOutputTapes)
    1861             : {
    1862             :     int         nOutputRuns;
    1863             :     int         nOutputTapes;
    1864             : 
    1865             :     /*
    1866             :      * How many output tapes will we produce in this pass?
    1867             :      *
    1868             :      * This is nInputRuns / nInputTapes, rounded up.
    1869             :      */
    1870         288 :     nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
    1871             : 
    1872         288 :     nOutputTapes = Min(nOutputRuns, maxOutputTapes);
    1873             : 
    1874             :     /*
    1875             :      * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
    1876             :      * remaining memory is divided evenly between the input tapes.
    1877             :      *
    1878             :      * This also follows from the formula in tuplesort_merge_order, but here
    1879             :      * we derive the input buffer size from the amount of memory available,
    1880             :      * and M and N.
    1881             :      */
    1882         288 :     return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
    1883             : }
    1884             : 
    1885             : /*
    1886             :  * inittapes - initialize for tape sorting.
    1887             :  *
    1888             :  * This is called only if we have found we won't sort in memory.
    1889             :  */
    1890             : static void
    1891         534 : inittapes(Tuplesortstate *state, bool mergeruns)
    1892             : {
    1893             :     Assert(!LEADER(state));
    1894             : 
    1895         534 :     if (mergeruns)
    1896             :     {
    1897             :         /* Compute number of input tapes to use when merging */
    1898         114 :         state->maxTapes = tuplesort_merge_order(state->allowedMem);
    1899             :     }
    1900             :     else
    1901             :     {
    1902             :         /* Workers can sometimes produce single run, output without merge */
    1903             :         Assert(WORKER(state));
    1904         420 :         state->maxTapes = MINORDER;
    1905             :     }
    1906             : 
    1907             : #ifdef TRACE_SORT
    1908         534 :     if (trace_sort)
    1909           0 :         elog(LOG, "worker %d switching to external sort with %d tapes: %s",
    1910             :              state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
    1911             : #endif
    1912             : 
    1913             :     /* Create the tape set */
    1914         534 :     inittapestate(state, state->maxTapes);
    1915         534 :     state->tapeset =
    1916         534 :         LogicalTapeSetCreate(false,
    1917         534 :                              state->shared ? &state->shared->fileset : NULL,
    1918             :                              state->worker);
    1919             : 
    1920         534 :     state->currentRun = 0;
    1921             : 
    1922             :     /*
    1923             :      * Initialize logical tape arrays.
    1924             :      */
    1925         534 :     state->inputTapes = NULL;
    1926         534 :     state->nInputTapes = 0;
    1927         534 :     state->nInputRuns = 0;
    1928             : 
    1929         534 :     state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
    1930         534 :     state->nOutputTapes = 0;
    1931         534 :     state->nOutputRuns = 0;
    1932             : 
    1933         534 :     state->status = TSS_BUILDRUNS;
    1934             : 
    1935         534 :     selectnewtape(state);
    1936         534 : }
    1937             : 
    1938             : /*
    1939             :  * inittapestate - initialize generic tape management state
    1940             :  */
    1941             : static void
    1942         678 : inittapestate(Tuplesortstate *state, int maxTapes)
    1943             : {
    1944             :     int64       tapeSpace;
    1945             : 
    1946             :     /*
    1947             :      * Decrease availMem to reflect the space needed for tape buffers; but
    1948             :      * don't decrease it to the point that we have no room for tuples. (That
    1949             :      * case is only likely to occur if sorting pass-by-value Datums; in all
    1950             :      * other scenarios the memtuples[] array is unlikely to occupy more than
    1951             :      * half of allowedMem.  In the pass-by-value case it's not important to
    1952             :      * account for tuple space, so we don't care if LACKMEM becomes
    1953             :      * inaccurate.)
    1954             :      */
    1955         678 :     tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
    1956             : 
    1957         678 :     if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
    1958         582 :         USEMEM(state, tapeSpace);
    1959             : 
    1960             :     /*
    1961             :      * Make sure that the temp file(s) underlying the tape set are created in
    1962             :      * suitable temp tablespaces.  For parallel sorts, this should have been
    1963             :      * called already, but it doesn't matter if it is called a second time.
    1964             :      */
    1965         678 :     PrepareTempTablespaces();
    1966         678 : }
    1967             : 
    1968             : /*
    1969             :  * selectnewtape -- select next tape to output to.
    1970             :  *
    1971             :  * This is called after finishing a run when we know another run
    1972             :  * must be started.  This is used both when building the initial
    1973             :  * runs, and during merge passes.
    1974             :  */
    1975             : static void
    1976        1602 : selectnewtape(Tuplesortstate *state)
    1977             : {
    1978             :     /*
    1979             :      * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
    1980             :      * both zero.  On each call, we create a new output tape to hold the next
    1981             :      * run, until maxTapes is reached.  After that, we assign new runs to the
    1982             :      * existing tapes in a round robin fashion.
    1983             :      */
    1984        1602 :     if (state->nOutputTapes < state->maxTapes)
    1985             :     {
    1986             :         /* Create a new tape to hold the next run */
    1987             :         Assert(state->outputTapes[state->nOutputRuns] == NULL);
    1988             :         Assert(state->nOutputRuns == state->nOutputTapes);
    1989        1020 :         state->destTape = LogicalTapeCreate(state->tapeset);
    1990        1020 :         state->outputTapes[state->nOutputTapes] = state->destTape;
    1991        1020 :         state->nOutputTapes++;
    1992        1020 :         state->nOutputRuns++;
    1993             :     }
    1994             :     else
    1995             :     {
    1996             :         /*
    1997             :          * We have reached the max number of tapes.  Append to an existing
    1998             :          * tape.
    1999             :          */
    2000         582 :         state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
    2001         582 :         state->nOutputRuns++;
    2002             :     }
    2003        1602 : }
    2004             : 
    2005             : /*
    2006             :  * Initialize the slab allocation arena, for the given number of slots.
    2007             :  */
    2008             : static void
    2009         258 : init_slab_allocator(Tuplesortstate *state, int numSlots)
    2010             : {
    2011         258 :     if (numSlots > 0)
    2012             :     {
    2013             :         char       *p;
    2014             :         int         i;
    2015             : 
    2016         246 :         state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
    2017         246 :         state->slabMemoryEnd = state->slabMemoryBegin +
    2018         246 :             numSlots * SLAB_SLOT_SIZE;
    2019         246 :         state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
    2020         246 :         USEMEM(state, numSlots * SLAB_SLOT_SIZE);
    2021             : 
    2022         246 :         p = state->slabMemoryBegin;
    2023         952 :         for (i = 0; i < numSlots - 1; i++)
    2024             :         {
    2025         706 :             ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
    2026         706 :             p += SLAB_SLOT_SIZE;
    2027             :         }
    2028         246 :         ((SlabSlot *) p)->nextfree = NULL;
    2029             :     }
    2030             :     else
    2031             :     {
    2032          12 :         state->slabMemoryBegin = state->slabMemoryEnd = NULL;
    2033          12 :         state->slabFreeHead = NULL;
    2034             :     }
    2035         258 :     state->slabAllocatorUsed = true;
    2036         258 : }
    2037             : 
    2038             : /*
    2039             :  * mergeruns -- merge all the completed initial runs.
    2040             :  *
    2041             :  * This implements the Balanced k-Way Merge Algorithm.  All input data has
    2042             :  * already been written to initial runs on tape (see dumptuples).
    2043             :  */
    2044             : static void
    2045         258 : mergeruns(Tuplesortstate *state)
    2046             : {
    2047             :     int         tapenum;
    2048             : 
    2049             :     Assert(state->status == TSS_BUILDRUNS);
    2050             :     Assert(state->memtupcount == 0);
    2051             : 
    2052         258 :     if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
    2053             :     {
    2054             :         /*
    2055             :          * If there are multiple runs to be merged, when we go to read back
    2056             :          * tuples from disk, abbreviated keys will not have been stored, and
    2057             :          * we don't care to regenerate them.  Disable abbreviation from this
    2058             :          * point on.
    2059             :          */
    2060          30 :         state->base.sortKeys->abbrev_converter = NULL;
    2061          30 :         state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
    2062             : 
    2063             :         /* Not strictly necessary, but be tidy */
    2064          30 :         state->base.sortKeys->abbrev_abort = NULL;
    2065          30 :         state->base.sortKeys->abbrev_full_comparator = NULL;
    2066             :     }
    2067             : 
    2068             :     /*
    2069             :      * Reset tuple memory.  We've freed all the tuples that we previously
    2070             :      * allocated.  We will use the slab allocator from now on.
    2071             :      */
    2072         258 :     MemoryContextResetOnly(state->base.tuplecontext);
    2073             : 
    2074             :     /*
    2075             :      * We no longer need a large memtuples array.  (We will allocate a smaller
    2076             :      * one for the heap later.)
    2077             :      */
    2078         258 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2079         258 :     pfree(state->memtuples);
    2080         258 :     state->memtuples = NULL;
    2081             : 
    2082             :     /*
    2083             :      * Initialize the slab allocator.  We need one slab slot per input tape,
    2084             :      * for the tuples in the heap, plus one to hold the tuple last returned
    2085             :      * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
    2086             :      * however, we don't need to do allocate anything.)
    2087             :      *
    2088             :      * In a multi-pass merge, we could shrink this allocation for the last
    2089             :      * merge pass, if it has fewer tapes than previous passes, but we don't
    2090             :      * bother.
    2091             :      *
    2092             :      * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
    2093             :      * to track memory usage of individual tuples.
    2094             :      */
    2095         258 :     if (state->base.tuples)
    2096         246 :         init_slab_allocator(state, state->nOutputTapes + 1);
    2097             :     else
    2098          12 :         init_slab_allocator(state, 0);
    2099             : 
    2100             :     /*
    2101             :      * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
    2102             :      * from each input tape.
    2103             :      *
    2104             :      * We could shrink this, too, between passes in a multi-pass merge, but we
    2105             :      * don't bother.  (The initial input tapes are still in outputTapes.  The
    2106             :      * number of input tapes will not increase between passes.)
    2107             :      */
    2108         258 :     state->memtupsize = state->nOutputTapes;
    2109         516 :     state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
    2110         258 :                                                         state->nOutputTapes * sizeof(SortTuple));
    2111         258 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    2112             : 
    2113             :     /*
    2114             :      * Use all the remaining memory we have available for tape buffers among
    2115             :      * all the input tapes.  At the beginning of each merge pass, we will
    2116             :      * divide this memory between the input and output tapes in the pass.
    2117             :      */
    2118         258 :     state->tape_buffer_mem = state->availMem;
    2119         258 :     USEMEM(state, state->tape_buffer_mem);
    2120             : #ifdef TRACE_SORT
    2121         258 :     if (trace_sort)
    2122           0 :         elog(LOG, "worker %d using %zu KB of memory for tape buffers",
    2123             :              state->worker, state->tape_buffer_mem / 1024);
    2124             : #endif
    2125             : 
    2126             :     for (;;)
    2127             :     {
    2128             :         /*
    2129             :          * On the first iteration, or if we have read all the runs from the
    2130             :          * input tapes in a multi-pass merge, it's time to start a new pass.
    2131             :          * Rewind all the output tapes, and make them inputs for the next
    2132             :          * pass.
    2133             :          */
    2134         396 :         if (state->nInputRuns == 0)
    2135             :         {
    2136             :             int64       input_buffer_size;
    2137             : 
    2138             :             /* Close the old, emptied, input tapes */
    2139         288 :             if (state->nInputTapes > 0)
    2140             :             {
    2141         210 :                 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2142         180 :                     LogicalTapeClose(state->inputTapes[tapenum]);
    2143          30 :                 pfree(state->inputTapes);
    2144             :             }
    2145             : 
    2146             :             /* Previous pass's outputs become next pass's inputs. */
    2147         288 :             state->inputTapes = state->outputTapes;
    2148         288 :             state->nInputTapes = state->nOutputTapes;
    2149         288 :             state->nInputRuns = state->nOutputRuns;
    2150             : 
    2151             :             /*
    2152             :              * Reset output tape variables.  The actual LogicalTapes will be
    2153             :              * created as needed, here we only allocate the array to hold
    2154             :              * them.
    2155             :              */
    2156         288 :             state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
    2157         288 :             state->nOutputTapes = 0;
    2158         288 :             state->nOutputRuns = 0;
    2159             : 
    2160             :             /*
    2161             :              * Redistribute the memory allocated for tape buffers, among the
    2162             :              * new input and output tapes.
    2163             :              */
    2164         288 :             input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
    2165             :                                                        state->nInputTapes,
    2166             :                                                        state->nInputRuns,
    2167             :                                                        state->maxTapes);
    2168             : 
    2169             : #ifdef TRACE_SORT
    2170         288 :             if (trace_sort)
    2171           0 :                 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
    2172             :                      state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
    2173             :                      pg_rusage_show(&state->ru_start));
    2174             : #endif
    2175             : 
    2176             :             /* Prepare the new input tapes for merge pass. */
    2177        1162 :             for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2178         874 :                 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
    2179             : 
    2180             :             /*
    2181             :              * If there's just one run left on each input tape, then only one
    2182             :              * merge pass remains.  If we don't have to produce a materialized
    2183             :              * sorted tape, we can stop at this point and do the final merge
    2184             :              * on-the-fly.
    2185             :              */
    2186         288 :             if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
    2187         270 :                 && state->nInputRuns <= state->nInputTapes
    2188         240 :                 && !WORKER(state))
    2189             :             {
    2190             :                 /* Tell logtape.c we won't be writing anymore */
    2191         240 :                 LogicalTapeSetForgetFreeSpace(state->tapeset);
    2192             :                 /* Initialize for the final merge pass */
    2193         240 :                 beginmerge(state);
    2194         240 :                 state->status = TSS_FINALMERGE;
    2195         240 :                 return;
    2196             :             }
    2197             :         }
    2198             : 
    2199             :         /* Select an output tape */
    2200         156 :         selectnewtape(state);
    2201             : 
    2202             :         /* Merge one run from each input tape. */
    2203         156 :         mergeonerun(state);
    2204             : 
    2205             :         /*
    2206             :          * If the input tapes are empty, and we output only one output run,
    2207             :          * we're done.  The current output tape contains the final result.
    2208             :          */
    2209         156 :         if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
    2210          18 :             break;
    2211             :     }
    2212             : 
    2213             :     /*
    2214             :      * Done.  The result is on a single run on a single tape.
    2215             :      */
    2216          18 :     state->result_tape = state->outputTapes[0];
    2217          18 :     if (!WORKER(state))
    2218          18 :         LogicalTapeFreeze(state->result_tape, NULL);
    2219             :     else
    2220           0 :         worker_freeze_result_tape(state);
    2221          18 :     state->status = TSS_SORTEDONTAPE;
    2222             : 
    2223             :     /* Close all the now-empty input tapes, to release their read buffers. */
    2224         102 :     for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
    2225          84 :         LogicalTapeClose(state->inputTapes[tapenum]);
    2226             : }
    2227             : 
    2228             : /*
    2229             :  * Merge one run from each input tape.
    2230             :  */
    2231             : static void
    2232         156 : mergeonerun(Tuplesortstate *state)
    2233             : {
    2234             :     int         srcTapeIndex;
    2235             :     LogicalTape *srcTape;
    2236             : 
    2237             :     /*
    2238             :      * Start the merge by loading one tuple from each active source tape into
    2239             :      * the heap.
    2240             :      */
    2241         156 :     beginmerge(state);
    2242             : 
    2243             :     Assert(state->slabAllocatorUsed);
    2244             : 
    2245             :     /*
    2246             :      * Execute merge by repeatedly extracting lowest tuple in heap, writing it
    2247             :      * out, and replacing it with next tuple from same tape (if there is
    2248             :      * another one).
    2249             :      */
    2250      855588 :     while (state->memtupcount > 0)
    2251             :     {
    2252             :         SortTuple   stup;
    2253             : 
    2254             :         /* write the tuple to destTape */
    2255      855432 :         srcTapeIndex = state->memtuples[0].srctape;
    2256      855432 :         srcTape = state->inputTapes[srcTapeIndex];
    2257      855432 :         WRITETUP(state, state->destTape, &state->memtuples[0]);
    2258             : 
    2259             :         /* recycle the slot of the tuple we just wrote out, for the next read */
    2260      855432 :         if (state->memtuples[0].tuple)
    2261      735348 :             RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
    2262             : 
    2263             :         /*
    2264             :          * pull next tuple from the tape, and replace the written-out tuple in
    2265             :          * the heap with it.
    2266             :          */
    2267      855432 :         if (mergereadnext(state, srcTape, &stup))
    2268             :         {
    2269      854586 :             stup.srctape = srcTapeIndex;
    2270      854586 :             tuplesort_heap_replace_top(state, &stup);
    2271             :         }
    2272             :         else
    2273             :         {
    2274         846 :             tuplesort_heap_delete_top(state);
    2275         846 :             state->nInputRuns--;
    2276             :         }
    2277             :     }
    2278             : 
    2279             :     /*
    2280             :      * When the heap empties, we're done.  Write an end-of-run marker on the
    2281             :      * output tape.
    2282             :      */
    2283         156 :     markrunend(state->destTape);
    2284         156 : }
    2285             : 
    2286             : /*
    2287             :  * beginmerge - initialize for a merge pass
    2288             :  *
    2289             :  * Fill the merge heap with the first tuple from each input tape.
    2290             :  */
    2291             : static void
    2292         396 : beginmerge(Tuplesortstate *state)
    2293             : {
    2294             :     int         activeTapes;
    2295             :     int         srcTapeIndex;
    2296             : 
    2297             :     /* Heap should be empty here */
    2298             :     Assert(state->memtupcount == 0);
    2299             : 
    2300         396 :     activeTapes = Min(state->nInputTapes, state->nInputRuns);
    2301             : 
    2302        1852 :     for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
    2303             :     {
    2304             :         SortTuple   tup;
    2305             : 
    2306        1456 :         if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
    2307             :         {
    2308        1228 :             tup.srctape = srcTapeIndex;
    2309        1228 :             tuplesort_heap_insert(state, &tup);
    2310             :         }
    2311             :     }
    2312         396 : }
    2313             : 
    2314             : /*
    2315             :  * mergereadnext - read next tuple from one merge input tape
    2316             :  *
    2317             :  * Returns false on EOF.
    2318             :  */
    2319             : static bool
    2320     5412218 : mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
    2321             : {
    2322             :     unsigned int tuplen;
    2323             : 
    2324             :     /* read next tuple, if any */
    2325     5412218 :     if ((tuplen = getlen(srcTape, true)) == 0)
    2326        1408 :         return false;
    2327     5410810 :     READTUP(state, stup, srcTape, tuplen);
    2328             : 
    2329     5410810 :     return true;
    2330             : }
    2331             : 
    2332             : /*
    2333             :  * dumptuples - remove tuples from memtuples and write initial run to tape
    2334             :  *
    2335             :  * When alltuples = true, dump everything currently in memory.  (This case is
    2336             :  * only used at end of input data.)
    2337             :  */
    2338             : static void
    2339     1061418 : dumptuples(Tuplesortstate *state, bool alltuples)
    2340             : {
    2341             :     int         memtupwrite;
    2342             :     int         i;
    2343             : 
    2344             :     /*
    2345             :      * Nothing to do if we still fit in available memory and have array slots,
    2346             :      * unless this is the final call during initial run generation.
    2347             :      */
    2348     1061418 :     if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
    2349     1060506 :         !alltuples)
    2350     1059972 :         return;
    2351             : 
    2352             :     /*
    2353             :      * Final call might require no sorting, in rare cases where we just so
    2354             :      * happen to have previously LACKMEM()'d at the point where exactly all
    2355             :      * remaining tuples are loaded into memory, just before input was
    2356             :      * exhausted.  In general, short final runs are quite possible, but avoid
    2357             :      * creating a completely empty run.  In a worker, though, we must produce
    2358             :      * at least one tape, even if it's empty.
    2359             :      */
    2360        1446 :     if (state->memtupcount == 0 && state->currentRun > 0)
    2361           0 :         return;
    2362             : 
    2363             :     Assert(state->status == TSS_BUILDRUNS);
    2364             : 
    2365             :     /*
    2366             :      * It seems unlikely that this limit will ever be exceeded, but take no
    2367             :      * chances
    2368             :      */
    2369        1446 :     if (state->currentRun == INT_MAX)
    2370           0 :         ereport(ERROR,
    2371             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    2372             :                  errmsg("cannot have more than %d runs for an external sort",
    2373             :                         INT_MAX)));
    2374             : 
    2375        1446 :     if (state->currentRun > 0)
    2376         912 :         selectnewtape(state);
    2377             : 
    2378        1446 :     state->currentRun++;
    2379             : 
    2380             : #ifdef TRACE_SORT
    2381        1446 :     if (trace_sort)
    2382           0 :         elog(LOG, "worker %d starting quicksort of run %d: %s",
    2383             :              state->worker, state->currentRun,
    2384             :              pg_rusage_show(&state->ru_start));
    2385             : #endif
    2386             : 
    2387             :     /*
    2388             :      * Sort all tuples accumulated within the allowed amount of memory for
    2389             :      * this run using quicksort
    2390             :      */
    2391        1446 :     tuplesort_sort_memtuples(state);
    2392             : 
    2393             : #ifdef TRACE_SORT
    2394        1446 :     if (trace_sort)
    2395           0 :         elog(LOG, "worker %d finished quicksort of run %d: %s",
    2396             :              state->worker, state->currentRun,
    2397             :              pg_rusage_show(&state->ru_start));
    2398             : #endif
    2399             : 
    2400        1446 :     memtupwrite = state->memtupcount;
    2401     5052028 :     for (i = 0; i < memtupwrite; i++)
    2402             :     {
    2403     5050582 :         SortTuple  *stup = &state->memtuples[i];
    2404             : 
    2405     5050582 :         WRITETUP(state, state->destTape, stup);
    2406             :     }
    2407             : 
    2408        1446 :     state->memtupcount = 0;
    2409             : 
    2410             :     /*
    2411             :      * Reset tuple memory.  We've freed all of the tuples that we previously
    2412             :      * allocated.  It's important to avoid fragmentation when there is a stark
    2413             :      * change in the sizes of incoming tuples.  In bounded sorts,
    2414             :      * fragmentation due to AllocSetFree's bucketing by size class might be
    2415             :      * particularly bad if this step wasn't taken.
    2416             :      */
    2417        1446 :     MemoryContextReset(state->base.tuplecontext);
    2418             : 
    2419             :     /*
    2420             :      * Now update the memory accounting to subtract the memory used by the
    2421             :      * tuple.
    2422             :      */
    2423        1446 :     FREEMEM(state, state->tupleMem);
    2424        1446 :     state->tupleMem = 0;
    2425             : 
    2426        1446 :     markrunend(state->destTape);
    2427             : 
    2428             : #ifdef TRACE_SORT
    2429        1446 :     if (trace_sort)
    2430           0 :         elog(LOG, "worker %d finished writing run %d to tape %d: %s",
    2431             :              state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
    2432             :              pg_rusage_show(&state->ru_start));
    2433             : #endif
    2434             : }
    2435             : 
    2436             : /*
    2437             :  * tuplesort_rescan     - rewind and replay the scan
    2438             :  */
    2439             : void
    2440          46 : tuplesort_rescan(Tuplesortstate *state)
    2441             : {
    2442          46 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2443             : 
    2444             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2445             : 
    2446          46 :     switch (state->status)
    2447             :     {
    2448          40 :         case TSS_SORTEDINMEM:
    2449          40 :             state->current = 0;
    2450          40 :             state->eof_reached = false;
    2451          40 :             state->markpos_offset = 0;
    2452          40 :             state->markpos_eof = false;
    2453          40 :             break;
    2454           6 :         case TSS_SORTEDONTAPE:
    2455           6 :             LogicalTapeRewindForRead(state->result_tape, 0);
    2456           6 :             state->eof_reached = false;
    2457           6 :             state->markpos_block = 0L;
    2458           6 :             state->markpos_offset = 0;
    2459           6 :             state->markpos_eof = false;
    2460           6 :             break;
    2461           0 :         default:
    2462           0 :             elog(ERROR, "invalid tuplesort state");
    2463             :             break;
    2464             :     }
    2465             : 
    2466          46 :     MemoryContextSwitchTo(oldcontext);
    2467          46 : }
    2468             : 
    2469             : /*
    2470             :  * tuplesort_markpos    - saves current position in the merged sort file
    2471             :  */
    2472             : void
    2473      569360 : tuplesort_markpos(Tuplesortstate *state)
    2474             : {
    2475      569360 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2476             : 
    2477             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2478             : 
    2479      569360 :     switch (state->status)
    2480             :     {
    2481      560552 :         case TSS_SORTEDINMEM:
    2482      560552 :             state->markpos_offset = state->current;
    2483      560552 :             state->markpos_eof = state->eof_reached;
    2484      560552 :             break;
    2485        8808 :         case TSS_SORTEDONTAPE:
    2486        8808 :             LogicalTapeTell(state->result_tape,
    2487             :                             &state->markpos_block,
    2488             :                             &state->markpos_offset);
    2489        8808 :             state->markpos_eof = state->eof_reached;
    2490        8808 :             break;
    2491           0 :         default:
    2492           0 :             elog(ERROR, "invalid tuplesort state");
    2493             :             break;
    2494             :     }
    2495             : 
    2496      569360 :     MemoryContextSwitchTo(oldcontext);
    2497      569360 : }
    2498             : 
    2499             : /*
    2500             :  * tuplesort_restorepos - restores current position in merged sort file to
    2501             :  *                        last saved position
    2502             :  */
    2503             : void
    2504       30062 : tuplesort_restorepos(Tuplesortstate *state)
    2505             : {
    2506       30062 :     MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
    2507             : 
    2508             :     Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
    2509             : 
    2510       30062 :     switch (state->status)
    2511             :     {
    2512       23870 :         case TSS_SORTEDINMEM:
    2513       23870 :             state->current = state->markpos_offset;
    2514       23870 :             state->eof_reached = state->markpos_eof;
    2515       23870 :             break;
    2516        6192 :         case TSS_SORTEDONTAPE:
    2517        6192 :             LogicalTapeSeek(state->result_tape,
    2518             :                             state->markpos_block,
    2519             :                             state->markpos_offset);
    2520        6192 :             state->eof_reached = state->markpos_eof;
    2521        6192 :             break;
    2522           0 :         default:
    2523           0 :             elog(ERROR, "invalid tuplesort state");
    2524             :             break;
    2525             :     }
    2526             : 
    2527       30062 :     MemoryContextSwitchTo(oldcontext);
    2528       30062 : }
    2529             : 
    2530             : /*
    2531             :  * tuplesort_get_stats - extract summary statistics
    2532             :  *
    2533             :  * This can be called after tuplesort_performsort() finishes to obtain
    2534             :  * printable summary information about how the sort was performed.
    2535             :  */
    2536             : void
    2537         384 : tuplesort_get_stats(Tuplesortstate *state,
    2538             :                     TuplesortInstrumentation *stats)
    2539             : {
    2540             :     /*
    2541             :      * Note: it might seem we should provide both memory and disk usage for a
    2542             :      * disk-based sort.  However, the current code doesn't track memory space
    2543             :      * accurately once we have begun to return tuples to the caller (since we
    2544             :      * don't account for pfree's the caller is expected to do), so we cannot
    2545             :      * rely on availMem in a disk sort.  This does not seem worth the overhead
    2546             :      * to fix.  Is it worth creating an API for the memory context code to
    2547             :      * tell us how much is actually used in sortcontext?
    2548             :      */
    2549         384 :     tuplesort_updatemax(state);
    2550             : 
    2551         384 :     if (state->isMaxSpaceDisk)
    2552           0 :         stats->spaceType = SORT_SPACE_TYPE_DISK;
    2553             :     else
    2554         384 :         stats->spaceType = SORT_SPACE_TYPE_MEMORY;
    2555         384 :     stats->spaceUsed = (state->maxSpace + 1023) / 1024;
    2556             : 
    2557         384 :     switch (state->maxSpaceStatus)
    2558             :     {
    2559         384 :         case TSS_SORTEDINMEM:
    2560         384 :             if (state->boundUsed)
    2561          42 :                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
    2562             :             else
    2563         342 :                 stats->sortMethod = SORT_TYPE_QUICKSORT;
    2564         384 :             break;
    2565           0 :         case TSS_SORTEDONTAPE:
    2566           0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
    2567           0 :             break;
    2568           0 :         case TSS_FINALMERGE:
    2569           0 :             stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
    2570           0 :             break;
    2571           0 :         default:
    2572           0 :             stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
    2573           0 :             break;
    2574             :     }
    2575         384 : }
    2576             : 
    2577             : /*
    2578             :  * Convert TuplesortMethod to a string.
    2579             :  */
    2580             : const char *
    2581         282 : tuplesort_method_name(TuplesortMethod m)
    2582             : {
    2583         282 :     switch (m)
    2584             :     {
    2585           0 :         case SORT_TYPE_STILL_IN_PROGRESS:
    2586           0 :             return "still in progress";
    2587          42 :         case SORT_TYPE_TOP_N_HEAPSORT:
    2588          42 :             return "top-N heapsort";
    2589         240 :         case SORT_TYPE_QUICKSORT:
    2590         240 :             return "quicksort";
    2591           0 :         case SORT_TYPE_EXTERNAL_SORT:
    2592           0 :             return "external sort";
    2593           0 :         case SORT_TYPE_EXTERNAL_MERGE:
    2594           0 :             return "external merge";
    2595             :     }
    2596             : 
    2597           0 :     return "unknown";
    2598             : }
    2599             : 
    2600             : /*
    2601             :  * Convert TuplesortSpaceType to a string.
    2602             :  */
    2603             : const char *
    2604         246 : tuplesort_space_type_name(TuplesortSpaceType t)
    2605             : {
    2606             :     Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
    2607         246 :     return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
    2608             : }
    2609             : 
    2610             : 
    2611             : /*
    2612             :  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
    2613             :  */
    2614             : 
    2615             : /*
    2616             :  * Convert the existing unordered array of SortTuples to a bounded heap,
    2617             :  * discarding all but the smallest "state->bound" tuples.
    2618             :  *
    2619             :  * When working with a bounded heap, we want to keep the largest entry
    2620             :  * at the root (array entry zero), instead of the smallest as in the normal
    2621             :  * sort case.  This allows us to discard the largest entry cheaply.
    2622             :  * Therefore, we temporarily reverse the sort direction.
    2623             :  */
    2624             : static void
    2625         398 : make_bounded_heap(Tuplesortstate *state)
    2626             : {
    2627         398 :     int         tupcount = state->memtupcount;
    2628             :     int         i;
    2629             : 
    2630             :     Assert(state->status == TSS_INITIAL);
    2631             :     Assert(state->bounded);
    2632             :     Assert(tupcount >= state->bound);
    2633             :     Assert(SERIAL(state));
    2634             : 
    2635             :     /* Reverse sort direction so largest entry will be at root */
    2636         398 :     reversedirection(state);
    2637             : 
    2638         398 :     state->memtupcount = 0;      /* make the heap empty */
    2639       37888 :     for (i = 0; i < tupcount; i++)
    2640             :     {
    2641       37490 :         if (state->memtupcount < state->bound)
    2642             :         {
    2643             :             /* Insert next tuple into heap */
    2644             :             /* Must copy source tuple to avoid possible overwrite */
    2645       18546 :             SortTuple   stup = state->memtuples[i];
    2646             : 
    2647       18546 :             tuplesort_heap_insert(state, &stup);
    2648             :         }
    2649             :         else
    2650             :         {
    2651             :             /*
    2652             :              * The heap is full.  Replace the largest entry with the new
    2653             :              * tuple, or just discard it, if it's larger than anything already
    2654             :              * in the heap.
    2655             :              */
    2656       18944 :             if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
    2657             :             {
    2658        9766 :                 free_sort_tuple(state, &state->memtuples[i]);
    2659        9766 :                 CHECK_FOR_INTERRUPTS();
    2660             :             }
    2661             :             else
    2662        9178 :                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
    2663             :         }
    2664             :     }
    2665             : 
    2666             :     Assert(state->memtupcount == state->bound);
    2667         398 :     state->status = TSS_BOUNDED;
    2668         398 : }
    2669             : 
    2670             : /*
    2671             :  * Convert the bounded heap to a properly-sorted array
    2672             :  */
    2673             : static void
    2674         398 : sort_bounded_heap(Tuplesortstate *state)
    2675             : {
    2676         398 :     int         tupcount = state->memtupcount;
    2677             : 
    2678             :     Assert(state->status == TSS_BOUNDED);
    2679             :     Assert(state->bounded);
    2680             :     Assert(tupcount == state->bound);
    2681             :     Assert(SERIAL(state));
    2682             : 
    2683             :     /*
    2684             :      * We can unheapify in place because each delete-top call will remove the
    2685             :      * largest entry, which we can promptly store in the newly freed slot at
    2686             :      * the end.  Once we're down to a single-entry heap, we're done.
    2687             :      */
    2688       18546 :     while (state->memtupcount > 1)
    2689             :     {
    2690       18148 :         SortTuple   stup = state->memtuples[0];
    2691             : 
    2692             :         /* this sifts-up the next-largest entry and decreases memtupcount */
    2693       18148 :         tuplesort_heap_delete_top(state);
    2694       18148 :         state->memtuples[state->memtupcount] = stup;
    2695             :     }
    2696         398 :     state->memtupcount = tupcount;
    2697             : 
    2698             :     /*
    2699             :      * Reverse sort direction back to the original state.  This is not
    2700             :      * actually necessary but seems like a good idea for tidiness.
    2701             :      */
    2702         398 :     reversedirection(state);
    2703             : 
    2704         398 :     state->status = TSS_SORTEDINMEM;
    2705         398 :     state->boundUsed = true;
    2706         398 : }
    2707             : 
    2708             : /*
    2709             :  * Sort all memtuples using specialized qsort() routines.
    2710             :  *
    2711             :  * Quicksort is used for small in-memory sorts, and external sort runs.
    2712             :  */
    2713             : static void
    2714      199756 : tuplesort_sort_memtuples(Tuplesortstate *state)
    2715             : {
    2716             :     Assert(!LEADER(state));
    2717             : 
    2718      199756 :     if (state->memtupcount > 1)
    2719             :     {
    2720             :         /*
    2721             :          * Do we have the leading column's value or abbreviation in datum1,
    2722             :          * and is there a specialization for its comparator?
    2723             :          */
    2724       60896 :         if (state->base.haveDatum1 && state->base.sortKeys)
    2725             :         {
    2726       60862 :             if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
    2727             :             {
    2728        2922 :                 qsort_tuple_unsigned(state->memtuples,
    2729        2922 :                                      state->memtupcount,
    2730             :                                      state);
    2731        2906 :                 return;
    2732             :             }
    2733             : #if SIZEOF_DATUM >= 8
    2734       57940 :             else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
    2735             :             {
    2736        1042 :                 qsort_tuple_signed(state->memtuples,
    2737        1042 :                                    state->memtupcount,
    2738             :                                    state);
    2739        1042 :                 return;
    2740             :             }
    2741             : #endif
    2742       56898 :             else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
    2743             :             {
    2744       38092 :                 qsort_tuple_int32(state->memtuples,
    2745       38092 :                                   state->memtupcount,
    2746             :                                   state);
    2747       38032 :                 return;
    2748             :             }
    2749             :         }
    2750             : 
    2751             :         /* Can we use the single-key sort function? */
    2752       18840 :         if (state->base.onlyKey != NULL)
    2753             :         {
    2754        8048 :             qsort_ssup(state->memtuples, state->memtupcount,
    2755        8048 :                        state->base.onlyKey);
    2756             :         }
    2757             :         else
    2758             :         {
    2759       10792 :             qsort_tuple(state->memtuples,
    2760       10792 :                         state->memtupcount,
    2761             :                         state->base.comparetup,
    2762             :                         state);
    2763             :         }
    2764             :     }
    2765             : }
    2766             : 
    2767             : /*
    2768             :  * Insert a new tuple into an empty or existing heap, maintaining the
    2769             :  * heap invariant.  Caller is responsible for ensuring there's room.
    2770             :  *
    2771             :  * Note: For some callers, tuple points to a memtuples[] entry above the
    2772             :  * end of the heap.  This is safe as long as it's not immediately adjacent
    2773             :  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
    2774             :  * is, it might get overwritten before being moved into the heap!
    2775             :  */
    2776             : static void
    2777       19774 : tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
    2778             : {
    2779             :     SortTuple  *memtuples;
    2780             :     int         j;
    2781             : 
    2782       19774 :     memtuples = state->memtuples;
    2783             :     Assert(state->memtupcount < state->memtupsize);
    2784             : 
    2785       19774 :     CHECK_FOR_INTERRUPTS();
    2786             : 
    2787             :     /*
    2788             :      * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
    2789             :      * using 1-based array indexes, not 0-based.
    2790             :      */
    2791       19774 :     j = state->memtupcount++;
    2792       57602 :     while (j > 0)
    2793             :     {
    2794       50856 :         int         i = (j - 1) >> 1;
    2795             : 
    2796       50856 :         if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
    2797       13028 :             break;
    2798       37828 :         memtuples[j] = memtuples[i];
    2799       37828 :         j = i;
    2800             :     }
    2801       19774 :     memtuples[j] = *tuple;
    2802       19774 : }
    2803             : 
    2804             : /*
    2805             :  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
    2806             :  * memtupcount, and sift up to maintain the heap invariant.
    2807             :  *
    2808             :  * The caller has already free'd the tuple the top node points to,
    2809             :  * if necessary.
    2810             :  */
    2811             : static void
    2812       19328 : tuplesort_heap_delete_top(Tuplesortstate *state)
    2813             : {
    2814       19328 :     SortTuple  *memtuples = state->memtuples;
    2815             :     SortTuple  *tuple;
    2816             : 
    2817       19328 :     if (--state->memtupcount <= 0)
    2818         276 :         return;
    2819             : 
    2820             :     /*
    2821             :      * Remove the last tuple in the heap, and re-insert it, by replacing the
    2822             :      * current top node with it.
    2823             :      */
    2824       19052 :     tuple = &memtuples[state->memtupcount];
    2825       19052 :     tuplesort_heap_replace_top(state, tuple);
    2826             : }
    2827             : 
    2828             : /*
    2829             :  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
    2830             :  * maintain the heap invariant.
    2831             :  *
    2832             :  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
    2833             :  * Heapsort, steps H3-H8).
    2834             :  */
    2835             : static void
    2836     5940714 : tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
    2837             : {
    2838     5940714 :     SortTuple  *memtuples = state->memtuples;
    2839             :     unsigned int i,
    2840             :                 n;
    2841             : 
    2842             :     Assert(state->memtupcount >= 1);
    2843             : 
    2844     5940714 :     CHECK_FOR_INTERRUPTS();
    2845             : 
    2846             :     /*
    2847             :      * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
    2848             :      * This prevents overflow in the "2 * i + 1" calculation, since at the top
    2849             :      * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
    2850             :      */
    2851     5940714 :     n = state->memtupcount;
    2852     5940714 :     i = 0;                      /* i is where the "hole" is */
    2853             :     for (;;)
    2854     1679886 :     {
    2855     7620600 :         unsigned int j = 2 * i + 1;
    2856             : 
    2857     7620600 :         if (j >= n)
    2858     1063442 :             break;
    2859     8990398 :         if (j + 1 < n &&
    2860     2433240 :             COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
    2861      960578 :             j++;
    2862     6557158 :         if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
    2863     4877272 :             break;
    2864     1679886 :         memtuples[i] = memtuples[j];
    2865     1679886 :         i = j;
    2866             :     }
    2867     5940714 :     memtuples[i] = *tuple;
    2868     5940714 : }
    2869             : 
    2870             : /*
    2871             :  * Function to reverse the sort direction from its current state
    2872             :  *
    2873             :  * It is not safe to call this when performing hash tuplesorts
    2874             :  */
    2875             : static void
    2876         796 : reversedirection(Tuplesortstate *state)
    2877             : {
    2878         796 :     SortSupport sortKey = state->base.sortKeys;
    2879             :     int         nkey;
    2880             : 
    2881        1952 :     for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
    2882             :     {
    2883        1156 :         sortKey->ssup_reverse = !sortKey->ssup_reverse;
    2884        1156 :         sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
    2885             :     }
    2886         796 : }
    2887             : 
    2888             : 
    2889             : /*
    2890             :  * Tape interface routines
    2891             :  */
    2892             : 
    2893             : static unsigned int
    2894     5685224 : getlen(LogicalTape *tape, bool eofOK)
    2895             : {
    2896             :     unsigned int len;
    2897             : 
    2898     5685224 :     if (LogicalTapeRead(tape,
    2899             :                         &len, sizeof(len)) != sizeof(len))
    2900           0 :         elog(ERROR, "unexpected end of tape");
    2901     5685224 :     if (len == 0 && !eofOK)
    2902           0 :         elog(ERROR, "unexpected end of data");
    2903     5685224 :     return len;
    2904             : }
    2905             : 
    2906             : static void
    2907        1602 : markrunend(LogicalTape *tape)
    2908             : {
    2909        1602 :     unsigned int len = 0;
    2910             : 
    2911        1602 :     LogicalTapeWrite(tape, &len, sizeof(len));
    2912        1602 : }
    2913             : 
    2914             : /*
    2915             :  * Get memory for tuple from within READTUP() routine.
    2916             :  *
    2917             :  * We use next free slot from the slab allocator, or palloc() if the tuple
    2918             :  * is too large for that.
    2919             :  */
    2920             : void *
    2921     5383546 : tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
    2922             : {
    2923             :     SlabSlot   *buf;
    2924             : 
    2925             :     /*
    2926             :      * We pre-allocate enough slots in the slab arena that we should never run
    2927             :      * out.
    2928             :      */
    2929             :     Assert(state->slabFreeHead);
    2930             : 
    2931     5383546 :     if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
    2932           0 :         return MemoryContextAlloc(state->base.sortcontext, tuplen);
    2933             :     else
    2934             :     {
    2935     5383546 :         buf = state->slabFreeHead;
    2936             :         /* Reuse this slot */
    2937     5383546 :         state->slabFreeHead = buf->nextfree;
    2938             : 
    2939     5383546 :         return buf;
    2940             :     }
    2941             : }
    2942             : 
    2943             : 
    2944             : /*
    2945             :  * Parallel sort routines
    2946             :  */
    2947             : 
    2948             : /*
    2949             :  * tuplesort_estimate_shared - estimate required shared memory allocation
    2950             :  *
    2951             :  * nWorkers is an estimate of the number of workers (it's the number that
    2952             :  * will be requested).
    2953             :  */
    2954             : Size
    2955         146 : tuplesort_estimate_shared(int nWorkers)
    2956             : {
    2957             :     Size        tapesSize;
    2958             : 
    2959             :     Assert(nWorkers > 0);
    2960             : 
    2961             :     /* Make sure that BufFile shared state is MAXALIGN'd */
    2962         146 :     tapesSize = mul_size(sizeof(TapeShare), nWorkers);
    2963         146 :     tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
    2964             : 
    2965         146 :     return tapesSize;
    2966             : }
    2967             : 
    2968             : /*
    2969             :  * tuplesort_initialize_shared - initialize shared tuplesort state
    2970             :  *
    2971             :  * Must be called from leader process before workers are launched, to
    2972             :  * establish state needed up-front for worker tuplesortstates.  nWorkers
    2973             :  * should match the argument passed to tuplesort_estimate_shared().
    2974             :  */
    2975             : void
    2976         210 : tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
    2977             : {
    2978             :     int         i;
    2979             : 
    2980             :     Assert(nWorkers > 0);
    2981             : 
    2982         210 :     SpinLockInit(&shared->mutex);
    2983         210 :     shared->currentWorker = 0;
    2984         210 :     shared->workersFinished = 0;
    2985         210 :     SharedFileSetInit(&shared->fileset, seg);
    2986         210 :     shared->nTapes = nWorkers;
    2987         638 :     for (i = 0; i < nWorkers; i++)
    2988             :     {
    2989         428 :         shared->tapes[i].firstblocknumber = 0L;
    2990             :     }
    2991         210 : }
    2992             : 
    2993             : /*
    2994             :  * tuplesort_attach_shared - attach to shared tuplesort state
    2995             :  *
    2996             :  * Must be called by all worker processes.
    2997             :  */
    2998             : void
    2999         212 : tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
    3000             : {
    3001             :     /* Attach to SharedFileSet */
    3002         212 :     SharedFileSetAttach(&shared->fileset, seg);
    3003         212 : }
    3004             : 
    3005             : /*
    3006             :  * worker_get_identifier - Assign and return ordinal identifier for worker
    3007             :  *
    3008             :  * The order in which these are assigned is not well defined, and should not
    3009             :  * matter; worker numbers across parallel sort participants need only be
    3010             :  * distinct and gapless.  logtape.c requires this.
    3011             :  *
    3012             :  * Note that the identifiers assigned from here have no relation to
    3013             :  * ParallelWorkerNumber number, to avoid making any assumption about
    3014             :  * caller's requirements.  However, we do follow the ParallelWorkerNumber
    3015             :  * convention of representing a non-worker with worker number -1.  This
    3016             :  * includes the leader, as well as serial Tuplesort processes.
    3017             :  */
    3018             : static int
    3019         420 : worker_get_identifier(Tuplesortstate *state)
    3020             : {
    3021         420 :     Sharedsort *shared = state->shared;
    3022             :     int         worker;
    3023             : 
    3024             :     Assert(WORKER(state));
    3025             : 
    3026         420 :     SpinLockAcquire(&shared->mutex);
    3027         420 :     worker = shared->currentWorker++;
    3028         420 :     SpinLockRelease(&shared->mutex);
    3029             : 
    3030         420 :     return worker;
    3031             : }
    3032             : 
    3033             : /*
    3034             :  * worker_freeze_result_tape - freeze worker's result tape for leader
    3035             :  *
    3036             :  * This is called by workers just after the result tape has been determined,
    3037             :  * instead of calling LogicalTapeFreeze() directly.  They do so because
    3038             :  * workers require a few additional steps over similar serial
    3039             :  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
    3040             :  * steps are around freeing now unneeded resources, and representing to
    3041             :  * leader that worker's input run is available for its merge.
    3042             :  *
    3043             :  * There should only be one final output run for each worker, which consists
    3044             :  * of all tuples that were originally input into worker.
    3045             :  */
    3046             : static void
    3047         420 : worker_freeze_result_tape(Tuplesortstate *state)
    3048             : {
    3049         420 :     Sharedsort *shared = state->shared;
    3050             :     TapeShare   output;
    3051             : 
    3052             :     Assert(WORKER(state));
    3053             :     Assert(state->result_tape != NULL);
    3054             :     Assert(state->memtupcount == 0);
    3055             : 
    3056             :     /*
    3057             :      * Free most remaining memory, in case caller is sensitive to our holding
    3058             :      * on to it.  memtuples may not be a tiny merge heap at this point.
    3059             :      */
    3060         420 :     pfree(state->memtuples);
    3061             :     /* Be tidy */
    3062         420 :     state->memtuples = NULL;
    3063         420 :     state->memtupsize = 0;
    3064             : 
    3065             :     /*
    3066             :      * Parallel worker requires result tape metadata, which is to be stored in
    3067             :      * shared memory for leader
    3068             :      */
    3069         420 :     LogicalTapeFreeze(state->result_tape, &output);
    3070             : 
    3071             :     /* Store properties of output tape, and update finished worker count */
    3072         420 :     SpinLockAcquire(&shared->mutex);
    3073         420 :     shared->tapes[state->worker] = output;
    3074         420 :     shared->workersFinished++;
    3075         420 :     SpinLockRelease(&shared->mutex);
    3076         420 : }
    3077             : 
    3078             : /*
    3079             :  * worker_nomergeruns - dump memtuples in worker, without merging
    3080             :  *
    3081             :  * This called as an alternative to mergeruns() with a worker when no
    3082             :  * merging is required.
    3083             :  */
    3084             : static void
    3085         420 : worker_nomergeruns(Tuplesortstate *state)
    3086             : {
    3087             :     Assert(WORKER(state));
    3088             :     Assert(state->result_tape == NULL);
    3089             :     Assert(state->nOutputRuns == 1);
    3090             : 
    3091         420 :     state->result_tape = state->destTape;
    3092         420 :     worker_freeze_result_tape(state);
    3093         420 : }
    3094             : 
    3095             : /*
    3096             :  * leader_takeover_tapes - create tapeset for leader from worker tapes
    3097             :  *
    3098             :  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
    3099             :  * sorting has occurred in workers, all of which must have already returned
    3100             :  * from tuplesort_performsort().
    3101             :  *
    3102             :  * When this returns, leader process is left in a state that is virtually
    3103             :  * indistinguishable from it having generated runs as a serial external sort
    3104             :  * might have.
    3105             :  */
    3106             : static void
    3107         144 : leader_takeover_tapes(Tuplesortstate *state)
    3108             : {
    3109         144 :     Sharedsort *shared = state->shared;
    3110         144 :     int         nParticipants = state->nParticipants;
    3111             :     int         workersFinished;
    3112             :     int         j;
    3113             : 
    3114             :     Assert(LEADER(state));
    3115             :     Assert(nParticipants >= 1);
    3116             : 
    3117         144 :     SpinLockAcquire(&shared->mutex);
    3118         144 :     workersFinished = shared->workersFinished;
    3119         144 :     SpinLockRelease(&shared->mutex);
    3120             : 
    3121         144 :     if (nParticipants != workersFinished)
    3122           0 :         elog(ERROR, "cannot take over tapes before all workers finish");
    3123             : 
    3124             :     /*
    3125             :      * Create the tapeset from worker tapes, including a leader-owned tape at
    3126             :      * the end.  Parallel workers are far more expensive than logical tapes,
    3127             :      * so the number of tapes allocated here should never be excessive.
    3128             :      */
    3129         144 :     inittapestate(state, nParticipants);
    3130         144 :     state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
    3131             : 
    3132             :     /*
    3133             :      * Set currentRun to reflect the number of runs we will merge (it's not
    3134             :      * used for anything, this is just pro forma)
    3135             :      */
    3136         144 :     state->currentRun = nParticipants;
    3137             : 
    3138             :     /*
    3139             :      * Initialize the state to look the same as after building the initial
    3140             :      * runs.
    3141             :      *
    3142             :      * There will always be exactly 1 run per worker, and exactly one input
    3143             :      * tape per run, because workers always output exactly 1 run, even when
    3144             :      * there were no input tuples for workers to sort.
    3145             :      */
    3146         144 :     state->inputTapes = NULL;
    3147         144 :     state->nInputTapes = 0;
    3148         144 :     state->nInputRuns = 0;
    3149             : 
    3150         144 :     state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
    3151         144 :     state->nOutputTapes = nParticipants;
    3152         144 :     state->nOutputRuns = nParticipants;
    3153             : 
    3154         436 :     for (j = 0; j < nParticipants; j++)
    3155             :     {
    3156         292 :         state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
    3157             :     }
    3158             : 
    3159         144 :     state->status = TSS_BUILDRUNS;
    3160         144 : }
    3161             : 
    3162             : /*
    3163             :  * Convenience routine to free a tuple previously loaded into sort memory
    3164             :  */
    3165             : static void
    3166     3743794 : free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
    3167             : {
    3168     3743794 :     if (stup->tuple)
    3169             :     {
    3170     3583596 :         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
    3171     3583596 :         pfree(stup->tuple);
    3172     3583596 :         stup->tuple = NULL;
    3173             :     }
    3174     3743794 : }
    3175             : 
    3176             : int
    3177           0 : ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
    3178             : {
    3179           0 :     if (x < y)
    3180           0 :         return -1;
    3181           0 :     else if (x > y)
    3182           0 :         return 1;
    3183             :     else
    3184           0 :         return 0;
    3185             : }
    3186             : 
    3187             : #if SIZEOF_DATUM >= 8
    3188             : int
    3189     1045694 : ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
    3190             : {
    3191     1045694 :     int64       xx = DatumGetInt64(x);
    3192     1045694 :     int64       yy = DatumGetInt64(y);
    3193             : 
    3194     1045694 :     if (xx < yy)
    3195      371156 :         return -1;
    3196      674538 :     else if (xx > yy)
    3197      347122 :         return 1;
    3198             :     else
    3199      327416 :         return 0;
    3200             : }
    3201             : #endif
    3202             : 
    3203             : int
    3204   156924778 : ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
    3205             : {
    3206   156924778 :     int32       xx = DatumGetInt32(x);
    3207   156924778 :     int32       yy = DatumGetInt32(y);
    3208             : 
    3209   156924778 :     if (xx < yy)
    3210    36851862 :         return -1;
    3211   120072916 :     else if (xx > yy)
    3212    35642168 :         return 1;
    3213             :     else
    3214    84430748 :         return 0;
    3215             : }

Generated by: LCOV version 1.14