LCOV - code coverage report
Current view: top level - src/backend/utils/sort - tuplestore.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 348 442 78.7 %
Date: 2024-04-27 00:11:45 Functions: 27 27 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * tuplestore.c
       4             :  *    Generalized routines for temporary tuple storage.
       5             :  *
       6             :  * This module handles temporary storage of tuples for purposes such
       7             :  * as Materialize nodes, hashjoin batch files, etc.  It is essentially
       8             :  * a dumbed-down version of tuplesort.c; it does no sorting of tuples
       9             :  * but can only store and regurgitate a sequence of tuples.  However,
      10             :  * because no sort is required, it is allowed to start reading the sequence
      11             :  * before it has all been written.  This is particularly useful for cursors,
      12             :  * because it allows random access within the already-scanned portion of
      13             :  * a query without having to process the underlying scan to completion.
      14             :  * Also, it is possible to support multiple independent read pointers.
      15             :  *
      16             :  * A temporary file is used to handle the data if it exceeds the
      17             :  * space limit specified by the caller.
      18             :  *
      19             :  * The (approximate) amount of memory allowed to the tuplestore is specified
      20             :  * in kilobytes by the caller.  We absorb tuples and simply store them in an
      21             :  * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
      22             :  * maxKBytes, we dump all the tuples into a temp file and then read from that
      23             :  * when needed.
      24             :  *
      25             :  * Upon creation, a tuplestore supports a single read pointer, numbered 0.
      26             :  * Additional read pointers can be created using tuplestore_alloc_read_pointer.
      27             :  * Mark/restore behavior is supported by copying read pointers.
      28             :  *
      29             :  * When the caller requests backward-scan capability, we write the temp file
      30             :  * in a format that allows either forward or backward scan.  Otherwise, only
      31             :  * forward scan is allowed.  A request for backward scan must be made before
      32             :  * putting any tuples into the tuplestore.  Rewind is normally allowed but
      33             :  * can be turned off via tuplestore_set_eflags; turning off rewind for all
      34             :  * read pointers enables truncation of the tuplestore at the oldest read point
      35             :  * for minimal memory usage.  (The caller must explicitly call tuplestore_trim
      36             :  * at appropriate times for truncation to actually happen.)
      37             :  *
      38             :  * Note: in TSS_WRITEFILE state, the temp file's seek position is the
      39             :  * current write position, and the write-position variables in the tuplestore
      40             :  * aren't kept up to date.  Similarly, in TSS_READFILE state the temp file's
      41             :  * seek position is the active read pointer's position, and that read pointer
      42             :  * isn't kept up to date.  We update the appropriate variables using ftell()
      43             :  * before switching to the other state or activating a different read pointer.
      44             :  *
      45             :  *
      46             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      47             :  * Portions Copyright (c) 1994, Regents of the University of California
      48             :  *
      49             :  * IDENTIFICATION
      50             :  *    src/backend/utils/sort/tuplestore.c
      51             :  *
      52             :  *-------------------------------------------------------------------------
      53             :  */
      54             : 
      55             : #include "postgres.h"
      56             : 
      57             : #include <limits.h>
      58             : 
      59             : #include "access/htup_details.h"
      60             : #include "commands/tablespace.h"
      61             : #include "executor/executor.h"
      62             : #include "miscadmin.h"
      63             : #include "storage/buffile.h"
      64             : #include "utils/memutils.h"
      65             : #include "utils/resowner.h"
      66             : 
      67             : 
      68             : /*
      69             :  * Possible states of a Tuplestore object.  These denote the states that
      70             :  * persist between calls of Tuplestore routines.
      71             :  */
      72             : typedef enum
      73             : {
      74             :     TSS_INMEM,                  /* Tuples still fit in memory */
      75             :     TSS_WRITEFILE,              /* Writing to temp file */
      76             :     TSS_READFILE,               /* Reading from temp file */
      77             : } TupStoreStatus;
      78             : 
      79             : /*
      80             :  * State for a single read pointer.  If we are in state INMEM then all the
      81             :  * read pointers' "current" fields denote the read positions.  In state
      82             :  * WRITEFILE, the file/offset fields denote the read positions.  In state
      83             :  * READFILE, inactive read pointers have valid file/offset, but the active
      84             :  * read pointer implicitly has position equal to the temp file's seek position.
      85             :  *
      86             :  * Special case: if eof_reached is true, then the pointer's read position is
      87             :  * implicitly equal to the write position, and current/file/offset aren't
      88             :  * maintained.  This way we need not update all the read pointers each time
      89             :  * we write.
      90             :  */
      91             : typedef struct
      92             : {
      93             :     int         eflags;         /* capability flags */
      94             :     bool        eof_reached;    /* read has reached EOF */
      95             :     int         current;        /* next array index to read */
      96             :     int         file;           /* temp file# */
      97             :     off_t       offset;         /* byte offset in file */
      98             : } TSReadPointer;
      99             : 
     100             : /*
     101             :  * Private state of a Tuplestore operation.
     102             :  */
     103             : struct Tuplestorestate
     104             : {
     105             :     TupStoreStatus status;      /* enumerated value as shown above */
     106             :     int         eflags;         /* capability flags (OR of pointers' flags) */
     107             :     bool        backward;       /* store extra length words in file? */
     108             :     bool        interXact;      /* keep open through transactions? */
     109             :     bool        truncated;      /* tuplestore_trim has removed tuples? */
     110             :     int64       availMem;       /* remaining memory available, in bytes */
     111             :     int64       allowedMem;     /* total memory allowed, in bytes */
     112             :     int64       tuples;         /* number of tuples added */
     113             :     BufFile    *myfile;         /* underlying file, or NULL if none */
     114             :     MemoryContext context;      /* memory context for holding tuples */
     115             :     ResourceOwner resowner;     /* resowner for holding temp files */
     116             : 
     117             :     /*
     118             :      * These function pointers decouple the routines that must know what kind
     119             :      * of tuple we are handling from the routines that don't need to know it.
     120             :      * They are set up by the tuplestore_begin_xxx routines.
     121             :      *
     122             :      * (Although tuplestore.c currently only supports heap tuples, I've copied
     123             :      * this part of tuplesort.c so that extension to other kinds of objects
     124             :      * will be easy if it's ever needed.)
     125             :      *
     126             :      * Function to copy a supplied input tuple into palloc'd space. (NB: we
     127             :      * assume that a single pfree() is enough to release the tuple later, so
     128             :      * the representation must be "flat" in one palloc chunk.) state->availMem
     129             :      * must be decreased by the amount of space used.
     130             :      */
     131             :     void       *(*copytup) (Tuplestorestate *state, void *tup);
     132             : 
     133             :     /*
     134             :      * Function to write a stored tuple onto tape.  The representation of the
     135             :      * tuple on tape need not be the same as it is in memory; requirements on
     136             :      * the tape representation are given below.  After writing the tuple,
     137             :      * pfree() it, and increase state->availMem by the amount of memory space
     138             :      * thereby released.
     139             :      */
     140             :     void        (*writetup) (Tuplestorestate *state, void *tup);
     141             : 
     142             :     /*
     143             :      * Function to read a stored tuple from tape back into memory. 'len' is
     144             :      * the already-read length of the stored tuple.  Create and return a
     145             :      * palloc'd copy, and decrease state->availMem by the amount of memory
     146             :      * space consumed.
     147             :      */
     148             :     void       *(*readtup) (Tuplestorestate *state, unsigned int len);
     149             : 
     150             :     /*
     151             :      * This array holds pointers to tuples in memory if we are in state INMEM.
     152             :      * In states WRITEFILE and READFILE it's not used.
     153             :      *
     154             :      * When memtupdeleted > 0, the first memtupdeleted pointers are already
     155             :      * released due to a tuplestore_trim() operation, but we haven't expended
     156             :      * the effort to slide the remaining pointers down.  These unused pointers
     157             :      * are set to NULL to catch any invalid accesses.  Note that memtupcount
     158             :      * includes the deleted pointers.
     159             :      */
     160             :     void      **memtuples;      /* array of pointers to palloc'd tuples */
     161             :     int         memtupdeleted;  /* the first N slots are currently unused */
     162             :     int         memtupcount;    /* number of tuples currently present */
     163             :     int         memtupsize;     /* allocated length of memtuples array */
     164             :     bool        growmemtuples;  /* memtuples' growth still underway? */
     165             : 
     166             :     /*
     167             :      * These variables are used to keep track of the current positions.
     168             :      *
     169             :      * In state WRITEFILE, the current file seek position is the write point;
     170             :      * in state READFILE, the write position is remembered in writepos_xxx.
     171             :      * (The write position is the same as EOF, but since BufFileSeek doesn't
     172             :      * currently implement SEEK_END, we have to remember it explicitly.)
     173             :      */
     174             :     TSReadPointer *readptrs;    /* array of read pointers */
     175             :     int         activeptr;      /* index of the active read pointer */
     176             :     int         readptrcount;   /* number of pointers currently valid */
     177             :     int         readptrsize;    /* allocated length of readptrs array */
     178             : 
     179             :     int         writepos_file;  /* file# (valid if READFILE state) */
     180             :     off_t       writepos_offset;    /* offset (valid if READFILE state) */
     181             : };
     182             : 
     183             : #define COPYTUP(state,tup)  ((*(state)->copytup) (state, tup))
     184             : #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
     185             : #define READTUP(state,len)  ((*(state)->readtup) (state, len))
     186             : #define LACKMEM(state)      ((state)->availMem < 0)
     187             : #define USEMEM(state,amt)   ((state)->availMem -= (amt))
     188             : #define FREEMEM(state,amt)  ((state)->availMem += (amt))
     189             : 
     190             : /*--------------------
     191             :  *
     192             :  * NOTES about on-tape representation of tuples:
     193             :  *
     194             :  * We require the first "unsigned int" of a stored tuple to be the total size
     195             :  * on-tape of the tuple, including itself (so it is never zero).
     196             :  * The remainder of the stored tuple
     197             :  * may or may not match the in-memory representation of the tuple ---
     198             :  * any conversion needed is the job of the writetup and readtup routines.
     199             :  *
     200             :  * If state->backward is true, then the stored representation of
     201             :  * the tuple must be followed by another "unsigned int" that is a copy of the
     202             :  * length --- so the total tape space used is actually sizeof(unsigned int)
     203             :  * more than the stored length value.  This allows read-backwards.  When
     204             :  * state->backward is not set, the write/read routines may omit the extra
     205             :  * length word.
     206             :  *
     207             :  * writetup is expected to write both length words as well as the tuple
     208             :  * data.  When readtup is called, the tape is positioned just after the
     209             :  * front length word; readtup must read the tuple data and advance past
     210             :  * the back length word (if present).
     211             :  *
     212             :  * The write/read routines can make use of the tuple description data
     213             :  * stored in the Tuplestorestate record, if needed. They are also expected
     214             :  * to adjust state->availMem by the amount of memory space (not tape space!)
     215             :  * released or consumed.  There is no error return from either writetup
     216             :  * or readtup; they should ereport() on failure.
     217             :  *
     218             :  *
     219             :  * NOTES about memory consumption calculations:
     220             :  *
     221             :  * We count space allocated for tuples against the maxKBytes limit,
     222             :  * plus the space used by the variable-size array memtuples.
     223             :  * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
     224             :  * We don't worry about the size of the read pointer array, either.
     225             :  *
     226             :  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
     227             :  * rather than the originally-requested size.  This is important since
     228             :  * palloc can add substantial overhead.  It's not a complete answer since
     229             :  * we won't count any wasted space in palloc allocation blocks, but it's
     230             :  * a lot better than what we were doing before 7.3.
     231             :  *
     232             :  *--------------------
     233             :  */
     234             : 
     235             : 
     236             : static Tuplestorestate *tuplestore_begin_common(int eflags,
     237             :                                                 bool interXact,
     238             :                                                 int maxKBytes);
     239             : static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
     240             : static void dumptuples(Tuplestorestate *state);
     241             : static unsigned int getlen(Tuplestorestate *state, bool eofOK);
     242             : static void *copytup_heap(Tuplestorestate *state, void *tup);
     243             : static void writetup_heap(Tuplestorestate *state, void *tup);
     244             : static void *readtup_heap(Tuplestorestate *state, unsigned int len);
     245             : 
     246             : 
     247             : /*
     248             :  *      tuplestore_begin_xxx
     249             :  *
     250             :  * Initialize for a tuple store operation.
     251             :  */
     252             : static Tuplestorestate *
     253      246998 : tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
     254             : {
     255             :     Tuplestorestate *state;
     256             : 
     257      246998 :     state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
     258             : 
     259      246998 :     state->status = TSS_INMEM;
     260      246998 :     state->eflags = eflags;
     261      246998 :     state->interXact = interXact;
     262      246998 :     state->truncated = false;
     263      246998 :     state->allowedMem = maxKBytes * 1024L;
     264      246998 :     state->availMem = state->allowedMem;
     265      246998 :     state->myfile = NULL;
     266      246998 :     state->context = CurrentMemoryContext;
     267      246998 :     state->resowner = CurrentResourceOwner;
     268             : 
     269      246998 :     state->memtupdeleted = 0;
     270      246998 :     state->memtupcount = 0;
     271      246998 :     state->tuples = 0;
     272             : 
     273             :     /*
     274             :      * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
     275             :      * see comments in grow_memtuples().
     276             :      */
     277      246998 :     state->memtupsize = Max(16384 / sizeof(void *),
     278             :                             ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
     279             : 
     280      246998 :     state->growmemtuples = true;
     281      246998 :     state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
     282             : 
     283      246998 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     284             : 
     285      246998 :     state->activeptr = 0;
     286      246998 :     state->readptrcount = 1;
     287      246998 :     state->readptrsize = 8;      /* arbitrary */
     288      246998 :     state->readptrs = (TSReadPointer *)
     289      246998 :         palloc(state->readptrsize * sizeof(TSReadPointer));
     290             : 
     291      246998 :     state->readptrs[0].eflags = eflags;
     292      246998 :     state->readptrs[0].eof_reached = false;
     293      246998 :     state->readptrs[0].current = 0;
     294             : 
     295      246998 :     return state;
     296             : }
     297             : 
     298             : /*
     299             :  * tuplestore_begin_heap
     300             :  *
     301             :  * Create a new tuplestore; other types of tuple stores (other than
     302             :  * "heap" tuple stores, for heap tuples) are possible, but not presently
     303             :  * implemented.
     304             :  *
     305             :  * randomAccess: if true, both forward and backward accesses to the
     306             :  * tuple store are allowed.
     307             :  *
     308             :  * interXact: if true, the files used for on-disk storage persist beyond the
     309             :  * end of the current transaction.  NOTE: It's the caller's responsibility to
     310             :  * create such a tuplestore in a memory context and resource owner that will
     311             :  * also survive transaction boundaries, and to ensure the tuplestore is closed
     312             :  * when it's no longer wanted.
     313             :  *
     314             :  * maxKBytes: how much data to store in memory (any data beyond this
     315             :  * amount is paged to disk).  When in doubt, use work_mem.
     316             :  */
     317             : Tuplestorestate *
     318      246998 : tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
     319             : {
     320             :     Tuplestorestate *state;
     321             :     int         eflags;
     322             : 
     323             :     /*
     324             :      * This interpretation of the meaning of randomAccess is compatible with
     325             :      * the pre-8.3 behavior of tuplestores.
     326             :      */
     327      246998 :     eflags = randomAccess ?
     328      246998 :         (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
     329             :         (EXEC_FLAG_REWIND);
     330             : 
     331      246998 :     state = tuplestore_begin_common(eflags, interXact, maxKBytes);
     332             : 
     333      246998 :     state->copytup = copytup_heap;
     334      246998 :     state->writetup = writetup_heap;
     335      246998 :     state->readtup = readtup_heap;
     336             : 
     337      246998 :     return state;
     338             : }
     339             : 
     340             : /*
     341             :  * tuplestore_set_eflags
     342             :  *
     343             :  * Set the capability flags for read pointer 0 at a finer grain than is
     344             :  * allowed by tuplestore_begin_xxx.  This must be called before inserting
     345             :  * any data into the tuplestore.
     346             :  *
     347             :  * eflags is a bitmask following the meanings used for executor node
     348             :  * startup flags (see executor.h).  tuplestore pays attention to these bits:
     349             :  *      EXEC_FLAG_REWIND        need rewind to start
     350             :  *      EXEC_FLAG_BACKWARD      need backward fetch
     351             :  * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
     352             :  * is set per "randomAccess" in the tuplestore_begin_xxx call.
     353             :  *
     354             :  * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
     355             :  * but not further than the truncation point (the furthest-back read pointer
     356             :  * position at the time of the last tuplestore_trim call).
     357             :  */
     358             : void
     359        8212 : tuplestore_set_eflags(Tuplestorestate *state, int eflags)
     360             : {
     361             :     int         i;
     362             : 
     363        8212 :     if (state->status != TSS_INMEM || state->memtupcount != 0)
     364           0 :         elog(ERROR, "too late to call tuplestore_set_eflags");
     365             : 
     366        8212 :     state->readptrs[0].eflags = eflags;
     367        8212 :     for (i = 1; i < state->readptrcount; i++)
     368           0 :         eflags |= state->readptrs[i].eflags;
     369        8212 :     state->eflags = eflags;
     370        8212 : }
     371             : 
     372             : /*
     373             :  * tuplestore_alloc_read_pointer - allocate another read pointer.
     374             :  *
     375             :  * Returns the pointer's index.
     376             :  *
     377             :  * The new pointer initially copies the position of read pointer 0.
     378             :  * It can have its own eflags, but if any data has been inserted into
     379             :  * the tuplestore, these eflags must not represent an increase in
     380             :  * requirements.
     381             :  */
     382             : int
     383       12250 : tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
     384             : {
     385             :     /* Check for possible increase of requirements */
     386       12250 :     if (state->status != TSS_INMEM || state->memtupcount != 0)
     387             :     {
     388         614 :         if ((state->eflags | eflags) != state->eflags)
     389           0 :             elog(ERROR, "too late to require new tuplestore eflags");
     390             :     }
     391             : 
     392             :     /* Make room for another read pointer if needed */
     393       12250 :     if (state->readptrcount >= state->readptrsize)
     394             :     {
     395          12 :         int         newcnt = state->readptrsize * 2;
     396             : 
     397          12 :         state->readptrs = (TSReadPointer *)
     398          12 :             repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
     399          12 :         state->readptrsize = newcnt;
     400             :     }
     401             : 
     402             :     /* And set it up */
     403       12250 :     state->readptrs[state->readptrcount] = state->readptrs[0];
     404       12250 :     state->readptrs[state->readptrcount].eflags = eflags;
     405             : 
     406       12250 :     state->eflags |= eflags;
     407             : 
     408       12250 :     return state->readptrcount++;
     409             : }
     410             : 
     411             : /*
     412             :  * tuplestore_clear
     413             :  *
     414             :  *  Delete all the contents of a tuplestore, and reset its read pointers
     415             :  *  to the start.
     416             :  */
     417             : void
     418      122842 : tuplestore_clear(Tuplestorestate *state)
     419             : {
     420             :     int         i;
     421             :     TSReadPointer *readptr;
     422             : 
     423      122842 :     if (state->myfile)
     424           0 :         BufFileClose(state->myfile);
     425      122842 :     state->myfile = NULL;
     426      122842 :     if (state->memtuples)
     427             :     {
     428      257178 :         for (i = state->memtupdeleted; i < state->memtupcount; i++)
     429             :         {
     430      134336 :             FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
     431      134336 :             pfree(state->memtuples[i]);
     432             :         }
     433             :     }
     434      122842 :     state->status = TSS_INMEM;
     435      122842 :     state->truncated = false;
     436      122842 :     state->memtupdeleted = 0;
     437      122842 :     state->memtupcount = 0;
     438      122842 :     state->tuples = 0;
     439      122842 :     readptr = state->readptrs;
     440      246368 :     for (i = 0; i < state->readptrcount; readptr++, i++)
     441             :     {
     442      123526 :         readptr->eof_reached = false;
     443      123526 :         readptr->current = 0;
     444             :     }
     445      122842 : }
     446             : 
     447             : /*
     448             :  * tuplestore_end
     449             :  *
     450             :  *  Release resources and clean up.
     451             :  */
     452             : void
     453      209098 : tuplestore_end(Tuplestorestate *state)
     454             : {
     455             :     int         i;
     456             : 
     457      209098 :     if (state->myfile)
     458         104 :         BufFileClose(state->myfile);
     459      209098 :     if (state->memtuples)
     460             :     {
     461    10150194 :         for (i = state->memtupdeleted; i < state->memtupcount; i++)
     462     9941096 :             pfree(state->memtuples[i]);
     463      209098 :         pfree(state->memtuples);
     464             :     }
     465      209098 :     pfree(state->readptrs);
     466      209098 :     pfree(state);
     467      209098 : }
     468             : 
     469             : /*
     470             :  * tuplestore_select_read_pointer - make the specified read pointer active
     471             :  */
     472             : void
     473     3813358 : tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
     474             : {
     475             :     TSReadPointer *readptr;
     476             :     TSReadPointer *oldptr;
     477             : 
     478             :     Assert(ptr >= 0 && ptr < state->readptrcount);
     479             : 
     480             :     /* No work if already active */
     481     3813358 :     if (ptr == state->activeptr)
     482      852080 :         return;
     483             : 
     484     2961278 :     readptr = &state->readptrs[ptr];
     485     2961278 :     oldptr = &state->readptrs[state->activeptr];
     486             : 
     487     2961278 :     switch (state->status)
     488             :     {
     489     2961278 :         case TSS_INMEM:
     490             :         case TSS_WRITEFILE:
     491             :             /* no work */
     492     2961278 :             break;
     493           0 :         case TSS_READFILE:
     494             : 
     495             :             /*
     496             :              * First, save the current read position in the pointer about to
     497             :              * become inactive.
     498             :              */
     499           0 :             if (!oldptr->eof_reached)
     500           0 :                 BufFileTell(state->myfile,
     501             :                             &oldptr->file,
     502             :                             &oldptr->offset);
     503             : 
     504             :             /*
     505             :              * We have to make the temp file's seek position equal to the
     506             :              * logical position of the new read pointer.  In eof_reached
     507             :              * state, that's the EOF, which we have available from the saved
     508             :              * write position.
     509             :              */
     510           0 :             if (readptr->eof_reached)
     511             :             {
     512           0 :                 if (BufFileSeek(state->myfile,
     513             :                                 state->writepos_file,
     514             :                                 state->writepos_offset,
     515             :                                 SEEK_SET) != 0)
     516           0 :                     ereport(ERROR,
     517             :                             (errcode_for_file_access(),
     518             :                              errmsg("could not seek in tuplestore temporary file")));
     519             :             }
     520             :             else
     521             :             {
     522           0 :                 if (BufFileSeek(state->myfile,
     523             :                                 readptr->file,
     524             :                                 readptr->offset,
     525             :                                 SEEK_SET) != 0)
     526           0 :                     ereport(ERROR,
     527             :                             (errcode_for_file_access(),
     528             :                              errmsg("could not seek in tuplestore temporary file")));
     529             :             }
     530           0 :             break;
     531           0 :         default:
     532           0 :             elog(ERROR, "invalid tuplestore state");
     533             :             break;
     534             :     }
     535             : 
     536     2961278 :     state->activeptr = ptr;
     537             : }
     538             : 
     539             : /*
     540             :  * tuplestore_tuple_count
     541             :  *
     542             :  * Returns the number of tuples added since creation or the last
     543             :  * tuplestore_clear().
     544             :  */
     545             : int64
     546        6344 : tuplestore_tuple_count(Tuplestorestate *state)
     547             : {
     548        6344 :     return state->tuples;
     549             : }
     550             : 
     551             : /*
     552             :  * tuplestore_ateof
     553             :  *
     554             :  * Returns the active read pointer's eof_reached state.
     555             :  */
     556             : bool
     557     2435872 : tuplestore_ateof(Tuplestorestate *state)
     558             : {
     559     2435872 :     return state->readptrs[state->activeptr].eof_reached;
     560             : }
     561             : 
     562             : /*
     563             :  * Grow the memtuples[] array, if possible within our memory constraint.  We
     564             :  * must not exceed INT_MAX tuples in memory or the caller-provided memory
     565             :  * limit.  Return true if we were able to enlarge the array, false if not.
     566             :  *
     567             :  * Normally, at each increment we double the size of the array.  When doing
     568             :  * that would exceed a limit, we attempt one last, smaller increase (and then
     569             :  * clear the growmemtuples flag so we don't try any more).  That allows us to
     570             :  * use memory as fully as permitted; sticking to the pure doubling rule could
     571             :  * result in almost half going unused.  Because availMem moves around with
     572             :  * tuple addition/removal, we need some rule to prevent making repeated small
     573             :  * increases in memtupsize, which would just be useless thrashing.  The
     574             :  * growmemtuples flag accomplishes that and also prevents useless
     575             :  * recalculations in this function.
     576             :  */
     577             : static bool
     578        1888 : grow_memtuples(Tuplestorestate *state)
     579             : {
     580             :     int         newmemtupsize;
     581        1888 :     int         memtupsize = state->memtupsize;
     582        1888 :     int64       memNowUsed = state->allowedMem - state->availMem;
     583             : 
     584             :     /* Forget it if we've already maxed out memtuples, per comment above */
     585        1888 :     if (!state->growmemtuples)
     586          42 :         return false;
     587             : 
     588             :     /* Select new value of memtupsize */
     589        1846 :     if (memNowUsed <= state->availMem)
     590             :     {
     591             :         /*
     592             :          * We've used no more than half of allowedMem; double our usage,
     593             :          * clamping at INT_MAX tuples.
     594             :          */
     595        1780 :         if (memtupsize < INT_MAX / 2)
     596        1780 :             newmemtupsize = memtupsize * 2;
     597             :         else
     598             :         {
     599           0 :             newmemtupsize = INT_MAX;
     600           0 :             state->growmemtuples = false;
     601             :         }
     602             :     }
     603             :     else
     604             :     {
     605             :         /*
     606             :          * This will be the last increment of memtupsize.  Abandon doubling
     607             :          * strategy and instead increase as much as we safely can.
     608             :          *
     609             :          * To stay within allowedMem, we can't increase memtupsize by more
     610             :          * than availMem / sizeof(void *) elements. In practice, we want to
     611             :          * increase it by considerably less, because we need to leave some
     612             :          * space for the tuples to which the new array slots will refer.  We
     613             :          * assume the new tuples will be about the same size as the tuples
     614             :          * we've already seen, and thus we can extrapolate from the space
     615             :          * consumption so far to estimate an appropriate new size for the
     616             :          * memtuples array.  The optimal value might be higher or lower than
     617             :          * this estimate, but it's hard to know that in advance.  We again
     618             :          * clamp at INT_MAX tuples.
     619             :          *
     620             :          * This calculation is safe against enlarging the array so much that
     621             :          * LACKMEM becomes true, because the memory currently used includes
     622             :          * the present array; thus, there would be enough allowedMem for the
     623             :          * new array elements even if no other memory were currently used.
     624             :          *
     625             :          * We do the arithmetic in float8, because otherwise the product of
     626             :          * memtupsize and allowedMem could overflow.  Any inaccuracy in the
     627             :          * result should be insignificant; but even if we computed a
     628             :          * completely insane result, the checks below will prevent anything
     629             :          * really bad from happening.
     630             :          */
     631             :         double      grow_ratio;
     632             : 
     633          66 :         grow_ratio = (double) state->allowedMem / (double) memNowUsed;
     634          66 :         if (memtupsize * grow_ratio < INT_MAX)
     635          66 :             newmemtupsize = (int) (memtupsize * grow_ratio);
     636             :         else
     637           0 :             newmemtupsize = INT_MAX;
     638             : 
     639             :         /* We won't make any further enlargement attempts */
     640          66 :         state->growmemtuples = false;
     641             :     }
     642             : 
     643             :     /* Must enlarge array by at least one element, else report failure */
     644        1846 :     if (newmemtupsize <= memtupsize)
     645           0 :         goto noalloc;
     646             : 
     647             :     /*
     648             :      * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
     649             :      * to ensure our request won't be rejected.  Note that we can easily
     650             :      * exhaust address space before facing this outcome.  (This is presently
     651             :      * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
     652             :      * don't rely on that at this distance.)
     653             :      */
     654        1846 :     if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
     655             :     {
     656           0 :         newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
     657           0 :         state->growmemtuples = false;    /* can't grow any more */
     658             :     }
     659             : 
     660             :     /*
     661             :      * We need to be sure that we do not cause LACKMEM to become true, else
     662             :      * the space management algorithm will go nuts.  The code above should
     663             :      * never generate a dangerous request, but to be safe, check explicitly
     664             :      * that the array growth fits within availMem.  (We could still cause
     665             :      * LACKMEM if the memory chunk overhead associated with the memtuples
     666             :      * array were to increase.  That shouldn't happen because we chose the
     667             :      * initial array size large enough to ensure that palloc will be treating
     668             :      * both old and new arrays as separate chunks.  But we'll check LACKMEM
     669             :      * explicitly below just in case.)
     670             :      */
     671        1846 :     if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
     672           0 :         goto noalloc;
     673             : 
     674             :     /* OK, do it */
     675        1846 :     FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
     676        1846 :     state->memtupsize = newmemtupsize;
     677        1846 :     state->memtuples = (void **)
     678        1846 :         repalloc_huge(state->memtuples,
     679        1846 :                       state->memtupsize * sizeof(void *));
     680        1846 :     USEMEM(state, GetMemoryChunkSpace(state->memtuples));
     681        1846 :     if (LACKMEM(state))
     682           0 :         elog(ERROR, "unexpected out-of-memory situation in tuplestore");
     683        1846 :     return true;
     684             : 
     685           0 : noalloc:
     686             :     /* If for any reason we didn't realloc, shut off future attempts */
     687           0 :     state->growmemtuples = false;
     688           0 :     return false;
     689             : }
     690             : 
     691             : /*
     692             :  * Accept one tuple and append it to the tuplestore.
     693             :  *
     694             :  * Note that the input tuple is always copied; the caller need not save it.
     695             :  *
     696             :  * If the active read pointer is currently "at EOF", it remains so (the read
     697             :  * pointer implicitly advances along with the write pointer); otherwise the
     698             :  * read pointer is unchanged.  Non-active read pointers do not move, which
     699             :  * means they are certain to not be "at EOF" immediately after puttuple.
     700             :  * This curious-seeming behavior is for the convenience of nodeMaterial.c and
     701             :  * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
     702             :  * steps.
     703             :  *
     704             :  * tuplestore_puttupleslot() is a convenience routine to collect data from
     705             :  * a TupleTableSlot without an extra copy operation.
     706             :  */
     707             : void
     708     1960336 : tuplestore_puttupleslot(Tuplestorestate *state,
     709             :                         TupleTableSlot *slot)
     710             : {
     711             :     MinimalTuple tuple;
     712     1960336 :     MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
     713             : 
     714             :     /*
     715             :      * Form a MinimalTuple in working memory
     716             :      */
     717     1960336 :     tuple = ExecCopySlotMinimalTuple(slot);
     718     1960336 :     USEMEM(state, GetMemoryChunkSpace(tuple));
     719             : 
     720     1960336 :     tuplestore_puttuple_common(state, (void *) tuple);
     721             : 
     722     1960336 :     MemoryContextSwitchTo(oldcxt);
     723     1960336 : }
     724             : 
     725             : /*
     726             :  * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
     727             :  * deprecated, but not worth getting rid of in view of the number of callers.
     728             :  */
     729             : void
     730     1607768 : tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
     731             : {
     732     1607768 :     MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
     733             : 
     734             :     /*
     735             :      * Copy the tuple.  (Must do this even in WRITEFILE case.  Note that
     736             :      * COPYTUP includes USEMEM, so we needn't do that here.)
     737             :      */
     738     1607768 :     tuple = COPYTUP(state, tuple);
     739             : 
     740     1607768 :     tuplestore_puttuple_common(state, (void *) tuple);
     741             : 
     742     1607768 :     MemoryContextSwitchTo(oldcxt);
     743     1607768 : }
     744             : 
     745             : /*
     746             :  * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
     747             :  * This avoids an extra tuple-construction operation.
     748             :  */
     749             : void
     750    16381224 : tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
     751             :                      const Datum *values, const bool *isnull)
     752             : {
     753             :     MinimalTuple tuple;
     754    16381224 :     MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
     755             : 
     756    16381224 :     tuple = heap_form_minimal_tuple(tdesc, values, isnull);
     757    16381224 :     USEMEM(state, GetMemoryChunkSpace(tuple));
     758             : 
     759    16381224 :     tuplestore_puttuple_common(state, (void *) tuple);
     760             : 
     761    16381224 :     MemoryContextSwitchTo(oldcxt);
     762    16381224 : }
     763             : 
     764             : static void
     765    19949328 : tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
     766             : {
     767             :     TSReadPointer *readptr;
     768             :     int         i;
     769             :     ResourceOwner oldowner;
     770             : 
     771    19949328 :     state->tuples++;
     772             : 
     773    19949328 :     switch (state->status)
     774             :     {
     775    13801174 :         case TSS_INMEM:
     776             : 
     777             :             /*
     778             :              * Update read pointers as needed; see API spec above.
     779             :              */
     780    13801174 :             readptr = state->readptrs;
     781    29867718 :             for (i = 0; i < state->readptrcount; readptr++, i++)
     782             :             {
     783    16066544 :                 if (readptr->eof_reached && i != state->activeptr)
     784             :                 {
     785         454 :                     readptr->eof_reached = false;
     786         454 :                     readptr->current = state->memtupcount;
     787             :                 }
     788             :             }
     789             : 
     790             :             /*
     791             :              * Grow the array as needed.  Note that we try to grow the array
     792             :              * when there is still one free slot remaining --- if we fail,
     793             :              * there'll still be room to store the incoming tuple, and then
     794             :              * we'll switch to tape-based operation.
     795             :              */
     796    13801174 :             if (state->memtupcount >= state->memtupsize - 1)
     797             :             {
     798        1888 :                 (void) grow_memtuples(state);
     799             :                 Assert(state->memtupcount < state->memtupsize);
     800             :             }
     801             : 
     802             :             /* Stash the tuple in the in-memory array */
     803    13801174 :             state->memtuples[state->memtupcount++] = tuple;
     804             : 
     805             :             /*
     806             :              * Done if we still fit in available memory and have array slots.
     807             :              */
     808    13801174 :             if (state->memtupcount < state->memtupsize && !LACKMEM(state))
     809    13801066 :                 return;
     810             : 
     811             :             /*
     812             :              * Nope; time to switch to tape-based operation.  Make sure that
     813             :              * the temp file(s) are created in suitable temp tablespaces.
     814             :              */
     815         108 :             PrepareTempTablespaces();
     816             : 
     817             :             /* associate the file with the store's resource owner */
     818         108 :             oldowner = CurrentResourceOwner;
     819         108 :             CurrentResourceOwner = state->resowner;
     820             : 
     821         108 :             state->myfile = BufFileCreateTemp(state->interXact);
     822             : 
     823         108 :             CurrentResourceOwner = oldowner;
     824             : 
     825             :             /*
     826             :              * Freeze the decision about whether trailing length words will be
     827             :              * used.  We can't change this choice once data is on tape, even
     828             :              * though callers might drop the requirement.
     829             :              */
     830         108 :             state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
     831         108 :             state->status = TSS_WRITEFILE;
     832         108 :             dumptuples(state);
     833         108 :             break;
     834     6148154 :         case TSS_WRITEFILE:
     835             : 
     836             :             /*
     837             :              * Update read pointers as needed; see API spec above. Note:
     838             :              * BufFileTell is quite cheap, so not worth trying to avoid
     839             :              * multiple calls.
     840             :              */
     841     6148154 :             readptr = state->readptrs;
     842    12296308 :             for (i = 0; i < state->readptrcount; readptr++, i++)
     843             :             {
     844     6148154 :                 if (readptr->eof_reached && i != state->activeptr)
     845             :                 {
     846           0 :                     readptr->eof_reached = false;
     847           0 :                     BufFileTell(state->myfile,
     848             :                                 &readptr->file,
     849             :                                 &readptr->offset);
     850             :                 }
     851             :             }
     852             : 
     853     6148154 :             WRITETUP(state, tuple);
     854     6148154 :             break;
     855           0 :         case TSS_READFILE:
     856             : 
     857             :             /*
     858             :              * Switch from reading to writing.
     859             :              */
     860           0 :             if (!state->readptrs[state->activeptr].eof_reached)
     861           0 :                 BufFileTell(state->myfile,
     862           0 :                             &state->readptrs[state->activeptr].file,
     863           0 :                             &state->readptrs[state->activeptr].offset);
     864           0 :             if (BufFileSeek(state->myfile,
     865             :                             state->writepos_file, state->writepos_offset,
     866             :                             SEEK_SET) != 0)
     867           0 :                 ereport(ERROR,
     868             :                         (errcode_for_file_access(),
     869             :                          errmsg("could not seek in tuplestore temporary file")));
     870           0 :             state->status = TSS_WRITEFILE;
     871             : 
     872             :             /*
     873             :              * Update read pointers as needed; see API spec above.
     874             :              */
     875           0 :             readptr = state->readptrs;
     876           0 :             for (i = 0; i < state->readptrcount; readptr++, i++)
     877             :             {
     878           0 :                 if (readptr->eof_reached && i != state->activeptr)
     879             :                 {
     880           0 :                     readptr->eof_reached = false;
     881           0 :                     readptr->file = state->writepos_file;
     882           0 :                     readptr->offset = state->writepos_offset;
     883             :                 }
     884             :             }
     885             : 
     886           0 :             WRITETUP(state, tuple);
     887           0 :             break;
     888           0 :         default:
     889           0 :             elog(ERROR, "invalid tuplestore state");
     890             :             break;
     891             :     }
     892             : }
     893             : 
     894             : /*
     895             :  * Fetch the next tuple in either forward or back direction.
     896             :  * Returns NULL if no more tuples.  If should_free is set, the
     897             :  * caller must pfree the returned tuple when done with it.
     898             :  *
     899             :  * Backward scan is only allowed if randomAccess was set true or
     900             :  * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
     901             :  */
     902             : static void *
     903    20437344 : tuplestore_gettuple(Tuplestorestate *state, bool forward,
     904             :                     bool *should_free)
     905             : {
     906    20437344 :     TSReadPointer *readptr = &state->readptrs[state->activeptr];
     907             :     unsigned int tuplen;
     908             :     void       *tup;
     909             : 
     910             :     Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
     911             : 
     912    20437344 :     switch (state->status)
     913             :     {
     914    14263448 :         case TSS_INMEM:
     915    14263448 :             *should_free = false;
     916    14263448 :             if (forward)
     917             :             {
     918    14088196 :                 if (readptr->eof_reached)
     919         180 :                     return NULL;
     920    14088016 :                 if (readptr->current < state->memtupcount)
     921             :                 {
     922             :                     /* We have another tuple, so return it */
     923    13737684 :                     return state->memtuples[readptr->current++];
     924             :                 }
     925      350332 :                 readptr->eof_reached = true;
     926      350332 :                 return NULL;
     927             :             }
     928             :             else
     929             :             {
     930             :                 /*
     931             :                  * if all tuples are fetched already then we return last
     932             :                  * tuple, else tuple before last returned.
     933             :                  */
     934      175252 :                 if (readptr->eof_reached)
     935             :                 {
     936        2596 :                     readptr->current = state->memtupcount;
     937        2596 :                     readptr->eof_reached = false;
     938             :                 }
     939             :                 else
     940             :                 {
     941      172656 :                     if (readptr->current <= state->memtupdeleted)
     942             :                     {
     943             :                         Assert(!state->truncated);
     944           0 :                         return NULL;
     945             :                     }
     946      172656 :                     readptr->current--; /* last returned tuple */
     947             :                 }
     948      175252 :                 if (readptr->current <= state->memtupdeleted)
     949             :                 {
     950             :                     Assert(!state->truncated);
     951          30 :                     return NULL;
     952             :                 }
     953      175222 :                 return state->memtuples[readptr->current - 1];
     954             :             }
     955             :             break;
     956             : 
     957         106 :         case TSS_WRITEFILE:
     958             :             /* Skip state change if we'll just return NULL */
     959         106 :             if (readptr->eof_reached && forward)
     960           0 :                 return NULL;
     961             : 
     962             :             /*
     963             :              * Switch from writing to reading.
     964             :              */
     965         106 :             BufFileTell(state->myfile,
     966             :                         &state->writepos_file, &state->writepos_offset);
     967         106 :             if (!readptr->eof_reached)
     968         106 :                 if (BufFileSeek(state->myfile,
     969             :                                 readptr->file, readptr->offset,
     970             :                                 SEEK_SET) != 0)
     971           0 :                     ereport(ERROR,
     972             :                             (errcode_for_file_access(),
     973             :                              errmsg("could not seek in tuplestore temporary file")));
     974         106 :             state->status = TSS_READFILE;
     975             :             /* FALLTHROUGH */
     976             : 
     977     6173896 :         case TSS_READFILE:
     978     6173896 :             *should_free = true;
     979     6173896 :             if (forward)
     980             :             {
     981     6173884 :                 if ((tuplen = getlen(state, true)) != 0)
     982             :                 {
     983     6173790 :                     tup = READTUP(state, tuplen);
     984     6173790 :                     return tup;
     985             :                 }
     986             :                 else
     987             :                 {
     988          94 :                     readptr->eof_reached = true;
     989          94 :                     return NULL;
     990             :                 }
     991             :             }
     992             : 
     993             :             /*
     994             :              * Backward.
     995             :              *
     996             :              * if all tuples are fetched already then we return last tuple,
     997             :              * else tuple before last returned.
     998             :              *
     999             :              * Back up to fetch previously-returned tuple's ending length
    1000             :              * word. If seek fails, assume we are at start of file.
    1001             :              */
    1002          12 :             if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
    1003             :                             SEEK_CUR) != 0)
    1004             :             {
    1005             :                 /* even a failed backwards fetch gets you out of eof state */
    1006           0 :                 readptr->eof_reached = false;
    1007             :                 Assert(!state->truncated);
    1008           0 :                 return NULL;
    1009             :             }
    1010          12 :             tuplen = getlen(state, false);
    1011             : 
    1012          12 :             if (readptr->eof_reached)
    1013             :             {
    1014           6 :                 readptr->eof_reached = false;
    1015             :                 /* We will return the tuple returned before returning NULL */
    1016             :             }
    1017             :             else
    1018             :             {
    1019             :                 /*
    1020             :                  * Back up to get ending length word of tuple before it.
    1021             :                  */
    1022           6 :                 if (BufFileSeek(state->myfile, 0,
    1023           6 :                                 -(long) (tuplen + 2 * sizeof(unsigned int)),
    1024             :                                 SEEK_CUR) != 0)
    1025             :                 {
    1026             :                     /*
    1027             :                      * If that fails, presumably the prev tuple is the first
    1028             :                      * in the file.  Back up so that it becomes next to read
    1029             :                      * in forward direction (not obviously right, but that is
    1030             :                      * what in-memory case does).
    1031             :                      */
    1032           0 :                     if (BufFileSeek(state->myfile, 0,
    1033           0 :                                     -(long) (tuplen + sizeof(unsigned int)),
    1034             :                                     SEEK_CUR) != 0)
    1035           0 :                         ereport(ERROR,
    1036             :                                 (errcode_for_file_access(),
    1037             :                                  errmsg("could not seek in tuplestore temporary file")));
    1038             :                     Assert(!state->truncated);
    1039           0 :                     return NULL;
    1040             :                 }
    1041           6 :                 tuplen = getlen(state, false);
    1042             :             }
    1043             : 
    1044             :             /*
    1045             :              * Now we have the length of the prior tuple, back up and read it.
    1046             :              * Note: READTUP expects we are positioned after the initial
    1047             :              * length word of the tuple, so back up to that point.
    1048             :              */
    1049          12 :             if (BufFileSeek(state->myfile, 0,
    1050          12 :                             -(long) tuplen,
    1051             :                             SEEK_CUR) != 0)
    1052           0 :                 ereport(ERROR,
    1053             :                         (errcode_for_file_access(),
    1054             :                          errmsg("could not seek in tuplestore temporary file")));
    1055          12 :             tup = READTUP(state, tuplen);
    1056          12 :             return tup;
    1057             : 
    1058           0 :         default:
    1059           0 :             elog(ERROR, "invalid tuplestore state");
    1060             :             return NULL;        /* keep compiler quiet */
    1061             :     }
    1062             : }
    1063             : 
    1064             : /*
    1065             :  * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
    1066             :  *
    1067             :  * If successful, put tuple in slot and return true; else, clear the slot
    1068             :  * and return false.
    1069             :  *
    1070             :  * If copy is true, the slot receives a copied tuple (allocated in current
    1071             :  * memory context) that will stay valid regardless of future manipulations of
    1072             :  * the tuplestore's state.  If copy is false, the slot may just receive a
    1073             :  * pointer to a tuple held within the tuplestore.  The latter is more
    1074             :  * efficient but the slot contents may be corrupted if additional writes to
    1075             :  * the tuplestore occur.  (If using tuplestore_trim, see comments therein.)
    1076             :  */
    1077             : bool
    1078    20265080 : tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
    1079             :                         bool copy, TupleTableSlot *slot)
    1080             : {
    1081             :     MinimalTuple tuple;
    1082             :     bool        should_free;
    1083             : 
    1084    20265080 :     tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
    1085             : 
    1086    20265080 :     if (tuple)
    1087             :     {
    1088    19916992 :         if (copy && !should_free)
    1089             :         {
    1090     1752320 :             tuple = heap_copy_minimal_tuple(tuple);
    1091     1752320 :             should_free = true;
    1092             :         }
    1093    19916992 :         ExecStoreMinimalTuple(tuple, slot, should_free);
    1094    19916992 :         return true;
    1095             :     }
    1096             :     else
    1097             :     {
    1098      348088 :         ExecClearTuple(slot);
    1099      348088 :         return false;
    1100             :     }
    1101             : }
    1102             : 
    1103             : /*
    1104             :  * tuplestore_advance - exported function to adjust position without fetching
    1105             :  *
    1106             :  * We could optimize this case to avoid palloc/pfree overhead, but for the
    1107             :  * moment it doesn't seem worthwhile.
    1108             :  */
    1109             : bool
    1110      172252 : tuplestore_advance(Tuplestorestate *state, bool forward)
    1111             : {
    1112             :     void       *tuple;
    1113             :     bool        should_free;
    1114             : 
    1115      172252 :     tuple = tuplestore_gettuple(state, forward, &should_free);
    1116             : 
    1117      172252 :     if (tuple)
    1118             :     {
    1119      169704 :         if (should_free)
    1120           0 :             pfree(tuple);
    1121      169704 :         return true;
    1122             :     }
    1123             :     else
    1124             :     {
    1125        2548 :         return false;
    1126             :     }
    1127             : }
    1128             : 
    1129             : /*
    1130             :  * Advance over N tuples in either forward or back direction,
    1131             :  * without returning any data.  N<=0 is a no-op.
    1132             :  * Returns true if successful, false if ran out of tuples.
    1133             :  */
    1134             : bool
    1135     1281186 : tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
    1136             : {
    1137     1281186 :     TSReadPointer *readptr = &state->readptrs[state->activeptr];
    1138             : 
    1139             :     Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
    1140             : 
    1141     1281186 :     if (ntuples <= 0)
    1142          18 :         return true;
    1143             : 
    1144     1281168 :     switch (state->status)
    1145             :     {
    1146     1281162 :         case TSS_INMEM:
    1147     1281162 :             if (forward)
    1148             :             {
    1149     1278454 :                 if (readptr->eof_reached)
    1150           0 :                     return false;
    1151     1278454 :                 if (state->memtupcount - readptr->current >= ntuples)
    1152             :                 {
    1153     1278364 :                     readptr->current += ntuples;
    1154     1278364 :                     return true;
    1155             :                 }
    1156          90 :                 readptr->current = state->memtupcount;
    1157          90 :                 readptr->eof_reached = true;
    1158          90 :                 return false;
    1159             :             }
    1160             :             else
    1161             :             {
    1162        2708 :                 if (readptr->eof_reached)
    1163             :                 {
    1164           0 :                     readptr->current = state->memtupcount;
    1165           0 :                     readptr->eof_reached = false;
    1166           0 :                     ntuples--;
    1167             :                 }
    1168        2708 :                 if (readptr->current - state->memtupdeleted > ntuples)
    1169             :                 {
    1170        2708 :                     readptr->current -= ntuples;
    1171        2708 :                     return true;
    1172             :                 }
    1173             :                 Assert(!state->truncated);
    1174           0 :                 readptr->current = state->memtupdeleted;
    1175           0 :                 return false;
    1176             :             }
    1177             :             break;
    1178             : 
    1179           6 :         default:
    1180             :             /* We don't currently try hard to optimize other cases */
    1181          18 :             while (ntuples-- > 0)
    1182             :             {
    1183             :                 void       *tuple;
    1184             :                 bool        should_free;
    1185             : 
    1186          12 :                 tuple = tuplestore_gettuple(state, forward, &should_free);
    1187             : 
    1188          12 :                 if (tuple == NULL)
    1189           0 :                     return false;
    1190          12 :                 if (should_free)
    1191          12 :                     pfree(tuple);
    1192          12 :                 CHECK_FOR_INTERRUPTS();
    1193             :             }
    1194           6 :             return true;
    1195             :     }
    1196             : }
    1197             : 
    1198             : /*
    1199             :  * dumptuples - remove tuples from memory and write to tape
    1200             :  *
    1201             :  * As a side effect, we must convert each read pointer's position from
    1202             :  * "current" to file/offset format.  But eof_reached pointers don't
    1203             :  * need to change state.
    1204             :  */
    1205             : static void
    1206         108 : dumptuples(Tuplestorestate *state)
    1207             : {
    1208             :     int         i;
    1209             : 
    1210         108 :     for (i = state->memtupdeleted;; i++)
    1211     2835724 :     {
    1212     2835832 :         TSReadPointer *readptr = state->readptrs;
    1213             :         int         j;
    1214             : 
    1215     5671664 :         for (j = 0; j < state->readptrcount; readptr++, j++)
    1216             :         {
    1217     2835832 :             if (i == readptr->current && !readptr->eof_reached)
    1218         108 :                 BufFileTell(state->myfile,
    1219             :                             &readptr->file, &readptr->offset);
    1220             :         }
    1221     2835832 :         if (i >= state->memtupcount)
    1222         108 :             break;
    1223     2835724 :         WRITETUP(state, state->memtuples[i]);
    1224             :     }
    1225         108 :     state->memtupdeleted = 0;
    1226         108 :     state->memtupcount = 0;
    1227         108 : }
    1228             : 
    1229             : /*
    1230             :  * tuplestore_rescan        - rewind the active read pointer to start
    1231             :  */
    1232             : void
    1233      287076 : tuplestore_rescan(Tuplestorestate *state)
    1234             : {
    1235      287076 :     TSReadPointer *readptr = &state->readptrs[state->activeptr];
    1236             : 
    1237             :     Assert(readptr->eflags & EXEC_FLAG_REWIND);
    1238             :     Assert(!state->truncated);
    1239             : 
    1240      287076 :     switch (state->status)
    1241             :     {
    1242      286970 :         case TSS_INMEM:
    1243      286970 :             readptr->eof_reached = false;
    1244      286970 :             readptr->current = 0;
    1245      286970 :             break;
    1246         106 :         case TSS_WRITEFILE:
    1247         106 :             readptr->eof_reached = false;
    1248         106 :             readptr->file = 0;
    1249         106 :             readptr->offset = 0;
    1250         106 :             break;
    1251           0 :         case TSS_READFILE:
    1252           0 :             readptr->eof_reached = false;
    1253           0 :             if (BufFileSeek(state->myfile, 0, 0, SEEK_SET) != 0)
    1254           0 :                 ereport(ERROR,
    1255             :                         (errcode_for_file_access(),
    1256             :                          errmsg("could not seek in tuplestore temporary file")));
    1257           0 :             break;
    1258           0 :         default:
    1259           0 :             elog(ERROR, "invalid tuplestore state");
    1260             :             break;
    1261             :     }
    1262      287076 : }
    1263             : 
    1264             : /*
    1265             :  * tuplestore_copy_read_pointer - copy a read pointer's state to another
    1266             :  */
    1267             : void
    1268       60932 : tuplestore_copy_read_pointer(Tuplestorestate *state,
    1269             :                              int srcptr, int destptr)
    1270             : {
    1271       60932 :     TSReadPointer *sptr = &state->readptrs[srcptr];
    1272       60932 :     TSReadPointer *dptr = &state->readptrs[destptr];
    1273             : 
    1274             :     Assert(srcptr >= 0 && srcptr < state->readptrcount);
    1275             :     Assert(destptr >= 0 && destptr < state->readptrcount);
    1276             : 
    1277             :     /* Assigning to self is a no-op */
    1278       60932 :     if (srcptr == destptr)
    1279           0 :         return;
    1280             : 
    1281       60932 :     if (dptr->eflags != sptr->eflags)
    1282             :     {
    1283             :         /* Possible change of overall eflags, so copy and then recompute */
    1284             :         int         eflags;
    1285             :         int         i;
    1286             : 
    1287           0 :         *dptr = *sptr;
    1288           0 :         eflags = state->readptrs[0].eflags;
    1289           0 :         for (i = 1; i < state->readptrcount; i++)
    1290           0 :             eflags |= state->readptrs[i].eflags;
    1291           0 :         state->eflags = eflags;
    1292             :     }
    1293             :     else
    1294       60932 :         *dptr = *sptr;
    1295             : 
    1296       60932 :     switch (state->status)
    1297             :     {
    1298       60932 :         case TSS_INMEM:
    1299             :         case TSS_WRITEFILE:
    1300             :             /* no work */
    1301       60932 :             break;
    1302           0 :         case TSS_READFILE:
    1303             : 
    1304             :             /*
    1305             :              * This case is a bit tricky since the active read pointer's
    1306             :              * position corresponds to the seek point, not what is in its
    1307             :              * variables.  Assigning to the active requires a seek, and
    1308             :              * assigning from the active requires a tell, except when
    1309             :              * eof_reached.
    1310             :              */
    1311           0 :             if (destptr == state->activeptr)
    1312             :             {
    1313           0 :                 if (dptr->eof_reached)
    1314             :                 {
    1315           0 :                     if (BufFileSeek(state->myfile,
    1316             :                                     state->writepos_file,
    1317             :                                     state->writepos_offset,
    1318             :                                     SEEK_SET) != 0)
    1319           0 :                         ereport(ERROR,
    1320             :                                 (errcode_for_file_access(),
    1321             :                                  errmsg("could not seek in tuplestore temporary file")));
    1322             :                 }
    1323             :                 else
    1324             :                 {
    1325           0 :                     if (BufFileSeek(state->myfile,
    1326             :                                     dptr->file, dptr->offset,
    1327             :                                     SEEK_SET) != 0)
    1328           0 :                         ereport(ERROR,
    1329             :                                 (errcode_for_file_access(),
    1330             :                                  errmsg("could not seek in tuplestore temporary file")));
    1331             :                 }
    1332             :             }
    1333           0 :             else if (srcptr == state->activeptr)
    1334             :             {
    1335           0 :                 if (!dptr->eof_reached)
    1336           0 :                     BufFileTell(state->myfile,
    1337             :                                 &dptr->file,
    1338             :                                 &dptr->offset);
    1339             :             }
    1340           0 :             break;
    1341           0 :         default:
    1342           0 :             elog(ERROR, "invalid tuplestore state");
    1343             :             break;
    1344             :     }
    1345             : }
    1346             : 
    1347             : /*
    1348             :  * tuplestore_trim  - remove all no-longer-needed tuples
    1349             :  *
    1350             :  * Calling this function authorizes the tuplestore to delete all tuples
    1351             :  * before the oldest read pointer, if no read pointer is marked as requiring
    1352             :  * REWIND capability.
    1353             :  *
    1354             :  * Note: this is obviously safe if no pointer has BACKWARD capability either.
    1355             :  * If a pointer is marked as BACKWARD but not REWIND capable, it means that
    1356             :  * the pointer can be moved backward but not before the oldest other read
    1357             :  * pointer.
    1358             :  */
    1359             : void
    1360      826800 : tuplestore_trim(Tuplestorestate *state)
    1361             : {
    1362             :     int         oldest;
    1363             :     int         nremove;
    1364             :     int         i;
    1365             : 
    1366             :     /*
    1367             :      * Truncation is disallowed if any read pointer requires rewind
    1368             :      * capability.
    1369             :      */
    1370      826800 :     if (state->eflags & EXEC_FLAG_REWIND)
    1371           0 :         return;
    1372             : 
    1373             :     /*
    1374             :      * We don't bother trimming temp files since it usually would mean more
    1375             :      * work than just letting them sit in kernel buffers until they age out.
    1376             :      */
    1377      826800 :     if (state->status != TSS_INMEM)
    1378           0 :         return;
    1379             : 
    1380             :     /* Find the oldest read pointer */
    1381      826800 :     oldest = state->memtupcount;
    1382     3564342 :     for (i = 0; i < state->readptrcount; i++)
    1383             :     {
    1384     2737542 :         if (!state->readptrs[i].eof_reached)
    1385     2708326 :             oldest = Min(oldest, state->readptrs[i].current);
    1386             :     }
    1387             : 
    1388             :     /*
    1389             :      * Note: you might think we could remove all the tuples before the oldest
    1390             :      * "current", since that one is the next to be returned.  However, since
    1391             :      * tuplestore_gettuple returns a direct pointer to our internal copy of
    1392             :      * the tuple, it's likely that the caller has still got the tuple just
    1393             :      * before "current" referenced in a slot. So we keep one extra tuple
    1394             :      * before the oldest "current".  (Strictly speaking, we could require such
    1395             :      * callers to use the "copy" flag to tuplestore_gettupleslot, but for
    1396             :      * efficiency we allow this one case to not use "copy".)
    1397             :      */
    1398      826800 :     nremove = oldest - 1;
    1399      826800 :     if (nremove <= 0)
    1400        7102 :         return;                 /* nothing to do */
    1401             : 
    1402             :     Assert(nremove >= state->memtupdeleted);
    1403             :     Assert(nremove <= state->memtupcount);
    1404             : 
    1405             :     /* Release no-longer-needed tuples */
    1406     1640450 :     for (i = state->memtupdeleted; i < nremove; i++)
    1407             :     {
    1408      820752 :         FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
    1409      820752 :         pfree(state->memtuples[i]);
    1410      820752 :         state->memtuples[i] = NULL;
    1411             :     }
    1412      819698 :     state->memtupdeleted = nremove;
    1413             : 
    1414             :     /* mark tuplestore as truncated (used for Assert crosschecks only) */
    1415      819698 :     state->truncated = true;
    1416             : 
    1417             :     /*
    1418             :      * If nremove is less than 1/8th memtupcount, just stop here, leaving the
    1419             :      * "deleted" slots as NULL.  This prevents us from expending O(N^2) time
    1420             :      * repeatedly memmove-ing a large pointer array.  The worst case space
    1421             :      * wastage is pretty small, since it's just pointers and not whole tuples.
    1422             :      */
    1423      819698 :     if (nremove < state->memtupcount / 8)
    1424      112680 :         return;
    1425             : 
    1426             :     /*
    1427             :      * Slide the array down and readjust pointers.
    1428             :      *
    1429             :      * In mergejoin's current usage, it's demonstrable that there will always
    1430             :      * be exactly one non-removed tuple; so optimize that case.
    1431             :      */
    1432      707018 :     if (nremove + 1 == state->memtupcount)
    1433      630254 :         state->memtuples[0] = state->memtuples[nremove];
    1434             :     else
    1435       76764 :         memmove(state->memtuples, state->memtuples + nremove,
    1436       76764 :                 (state->memtupcount - nremove) * sizeof(void *));
    1437             : 
    1438      707018 :     state->memtupdeleted = 0;
    1439      707018 :     state->memtupcount -= nremove;
    1440     3071712 :     for (i = 0; i < state->readptrcount; i++)
    1441             :     {
    1442     2364694 :         if (!state->readptrs[i].eof_reached)
    1443     2360444 :             state->readptrs[i].current -= nremove;
    1444             :     }
    1445             : }
    1446             : 
    1447             : /*
    1448             :  * tuplestore_in_memory
    1449             :  *
    1450             :  * Returns true if the tuplestore has not spilled to disk.
    1451             :  *
    1452             :  * XXX exposing this is a violation of modularity ... should get rid of it.
    1453             :  */
    1454             : bool
    1455     1553382 : tuplestore_in_memory(Tuplestorestate *state)
    1456             : {
    1457     1553382 :     return (state->status == TSS_INMEM);
    1458             : }
    1459             : 
    1460             : 
    1461             : /*
    1462             :  * Tape interface routines
    1463             :  */
    1464             : 
    1465             : static unsigned int
    1466     6173902 : getlen(Tuplestorestate *state, bool eofOK)
    1467             : {
    1468             :     unsigned int len;
    1469             :     size_t      nbytes;
    1470             : 
    1471     6173902 :     nbytes = BufFileReadMaybeEOF(state->myfile, &len, sizeof(len), eofOK);
    1472     6173902 :     if (nbytes == 0)
    1473          94 :         return 0;
    1474             :     else
    1475     6173808 :         return len;
    1476             : }
    1477             : 
    1478             : 
    1479             : /*
    1480             :  * Routines specialized for HeapTuple case
    1481             :  *
    1482             :  * The stored form is actually a MinimalTuple, but for largely historical
    1483             :  * reasons we allow COPYTUP to work from a HeapTuple.
    1484             :  *
    1485             :  * Since MinimalTuple already has length in its first word, we don't need
    1486             :  * to write that separately.
    1487             :  */
    1488             : 
    1489             : static void *
    1490     1607768 : copytup_heap(Tuplestorestate *state, void *tup)
    1491             : {
    1492             :     MinimalTuple tuple;
    1493             : 
    1494     1607768 :     tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
    1495     1607768 :     USEMEM(state, GetMemoryChunkSpace(tuple));
    1496     1607768 :     return (void *) tuple;
    1497             : }
    1498             : 
    1499             : static void
    1500     8983878 : writetup_heap(Tuplestorestate *state, void *tup)
    1501             : {
    1502     8983878 :     MinimalTuple tuple = (MinimalTuple) tup;
    1503             : 
    1504             :     /* the part of the MinimalTuple we'll write: */
    1505     8983878 :     char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
    1506     8983878 :     unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
    1507             : 
    1508             :     /* total on-disk footprint: */
    1509     8983878 :     unsigned int tuplen = tupbodylen + sizeof(int);
    1510             : 
    1511     8983878 :     BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
    1512     8983878 :     BufFileWrite(state->myfile, tupbody, tupbodylen);
    1513     8983878 :     if (state->backward)     /* need trailing length word? */
    1514       60000 :         BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
    1515             : 
    1516     8983878 :     FREEMEM(state, GetMemoryChunkSpace(tuple));
    1517     8983878 :     heap_free_minimal_tuple(tuple);
    1518     8983878 : }
    1519             : 
    1520             : static void *
    1521     6173802 : readtup_heap(Tuplestorestate *state, unsigned int len)
    1522             : {
    1523     6173802 :     unsigned int tupbodylen = len - sizeof(int);
    1524     6173802 :     unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
    1525     6173802 :     MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
    1526     6173802 :     char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
    1527             : 
    1528     6173802 :     USEMEM(state, GetMemoryChunkSpace(tuple));
    1529             :     /* read in the tuple proper */
    1530     6173802 :     tuple->t_len = tuplen;
    1531     6173802 :     BufFileReadExact(state->myfile, tupbody, tupbodylen);
    1532     6173802 :     if (state->backward)     /* need trailing length word? */
    1533       60018 :         BufFileReadExact(state->myfile, &tuplen, sizeof(tuplen));
    1534     6173802 :     return (void *) tuple;
    1535             : }

Generated by: LCOV version 1.14