LCOV - code coverage report
Current view: top level - src/backend/utils/sort - sharedtuplestore.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.1 % 188 177
Test Date: 2026-03-04 06:14:56 Functions: 91.7 % 12 11
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * sharedtuplestore.c
       4              :  *    Simple mechanism for sharing tuples between backends.
       5              :  *
       6              :  * This module contains a shared temporary tuple storage mechanism providing
       7              :  * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
       8              :  * can write to a SharedTuplestore, and then multiple backends can later scan
       9              :  * the stored tuples.  Currently, the only scan type supported is a parallel
      10              :  * scan where each backend reads an arbitrary subset of the tuples that were
      11              :  * written.
      12              :  *
      13              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      14              :  * Portions Copyright (c) 1994, Regents of the University of California
      15              :  *
      16              :  * IDENTIFICATION
      17              :  *    src/backend/utils/sort/sharedtuplestore.c
      18              :  *
      19              :  *-------------------------------------------------------------------------
      20              :  */
      21              : 
      22              : #include "postgres.h"
      23              : 
      24              : #include "access/htup.h"
      25              : #include "access/htup_details.h"
      26              : #include "storage/buffile.h"
      27              : #include "storage/lwlock.h"
      28              : #include "storage/sharedfileset.h"
      29              : #include "utils/sharedtuplestore.h"
      30              : 
      31              : /*
      32              :  * The size of chunks, in pages.  This is somewhat arbitrarily set to match
      33              :  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
      34              :  * at approximately the same rate as it allocates new chunks of memory to
      35              :  * insert them into.
      36              :  */
      37              : #define STS_CHUNK_PAGES 4
      38              : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
      39              : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
      40              : 
      41              : /* Chunk written to disk. */
      42              : typedef struct SharedTuplestoreChunk
      43              : {
      44              :     int         ntuples;        /* Number of tuples in this chunk. */
      45              :     int         overflow;       /* If overflow, how many including this one? */
      46              :     char        data[FLEXIBLE_ARRAY_MEMBER];
      47              : } SharedTuplestoreChunk;
      48              : 
      49              : /* Per-participant shared state. */
      50              : typedef struct SharedTuplestoreParticipant
      51              : {
      52              :     LWLock      lock;
      53              :     BlockNumber read_page;      /* Page number for next read. */
      54              :     BlockNumber npages;         /* Number of pages written. */
      55              :     bool        writing;        /* Used only for assertions. */
      56              : } SharedTuplestoreParticipant;
      57              : 
      58              : /* The control object that lives in shared memory. */
      59              : struct SharedTuplestore
      60              : {
      61              :     int         nparticipants;  /* Number of participants that can write. */
      62              :     int         flags;          /* Flag bits from SHARED_TUPLESTORE_XXX */
      63              :     size_t      meta_data_size; /* Size of per-tuple header. */
      64              :     char        name[NAMEDATALEN];  /* A name for this tuplestore. */
      65              : 
      66              :     /* Followed by per-participant shared state. */
      67              :     SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
      68              : };
      69              : 
      70              : /* Per-participant state that lives in backend-local memory. */
      71              : struct SharedTuplestoreAccessor
      72              : {
      73              :     int         participant;    /* My participant number. */
      74              :     SharedTuplestore *sts;      /* The shared state. */
      75              :     SharedFileSet *fileset;     /* The SharedFileSet holding files. */
      76              :     MemoryContext context;      /* Memory context for buffers. */
      77              : 
      78              :     /* State for reading. */
      79              :     int         read_participant;   /* The current participant to read from. */
      80              :     BufFile    *read_file;      /* The current file to read from. */
      81              :     int         read_ntuples_available; /* The number of tuples in chunk. */
      82              :     int         read_ntuples;   /* How many tuples have we read from chunk? */
      83              :     size_t      read_bytes;     /* How many bytes have we read from chunk? */
      84              :     char       *read_buffer;    /* A buffer for loading tuples. */
      85              :     size_t      read_buffer_size;
      86              :     BlockNumber read_next_page; /* Lowest block we'll consider reading. */
      87              : 
      88              :     /* State for writing. */
      89              :     SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
      90              :     BufFile    *write_file;     /* The current file to write to. */
      91              :     char       *write_pointer;  /* Current write pointer within chunk. */
      92              :     char       *write_end;      /* One past the end of the current chunk. */
      93              : };
      94              : 
      95              : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
      96              :                          int participant);
      97              : 
      98              : /*
      99              :  * Return the amount of shared memory required to hold SharedTuplestore for a
     100              :  * given number of participants.
     101              :  */
     102              : size_t
     103         2200 : sts_estimate(int participants)
     104              : {
     105         4400 :     return offsetof(SharedTuplestore, participants) +
     106         2200 :         sizeof(SharedTuplestoreParticipant) * participants;
     107              : }
     108              : 
     109              : /*
     110              :  * Initialize a SharedTuplestore in existing shared memory.  There must be
     111              :  * space for sts_estimate(participants) bytes.  If flags includes the value
     112              :  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
     113              :  * eagerly (but this isn't yet implemented).
     114              :  *
     115              :  * Tuples that are stored may optionally carry a piece of fixed sized
     116              :  * meta-data which will be retrieved along with the tuple.  This is useful for
     117              :  * the hash values used in multi-batch hash joins, but could have other
     118              :  * applications.
     119              :  *
     120              :  * The caller must supply a SharedFileSet, which is essentially a directory
     121              :  * that will be cleaned up automatically, and a name which must be unique
     122              :  * across all SharedTuplestores created in the same SharedFileSet.
     123              :  */
     124              : SharedTuplestoreAccessor *
     125          840 : sts_initialize(SharedTuplestore *sts, int participants,
     126              :                int my_participant_number,
     127              :                size_t meta_data_size,
     128              :                int flags,
     129              :                SharedFileSet *fileset,
     130              :                const char *name)
     131              : {
     132              :     SharedTuplestoreAccessor *accessor;
     133              :     int         i;
     134              : 
     135              :     Assert(my_participant_number < participants);
     136              : 
     137          840 :     sts->nparticipants = participants;
     138          840 :     sts->meta_data_size = meta_data_size;
     139          840 :     sts->flags = flags;
     140              : 
     141          840 :     if (strlen(name) > sizeof(sts->name) - 1)
     142            0 :         elog(ERROR, "SharedTuplestore name too long");
     143          840 :     strcpy(sts->name, name);
     144              : 
     145              :     /*
     146              :      * Limit meta-data so it + tuple size always fits into a single chunk.
     147              :      * sts_puttuple() and sts_read_tuple() could be made to support scenarios
     148              :      * where that's not the case, but it's not currently required. If so,
     149              :      * meta-data size probably should be made variable, too.
     150              :      */
     151          840 :     if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
     152            0 :         elog(ERROR, "meta-data too long");
     153              : 
     154         3138 :     for (i = 0; i < participants; ++i)
     155              :     {
     156         2298 :         LWLockInitialize(&sts->participants[i].lock,
     157              :                          LWTRANCHE_SHARED_TUPLESTORE);
     158         2298 :         sts->participants[i].read_page = 0;
     159         2298 :         sts->participants[i].npages = 0;
     160         2298 :         sts->participants[i].writing = false;
     161              :     }
     162              : 
     163          840 :     accessor = palloc0_object(SharedTuplestoreAccessor);
     164          840 :     accessor->participant = my_participant_number;
     165          840 :     accessor->sts = sts;
     166          840 :     accessor->fileset = fileset;
     167          840 :     accessor->context = CurrentMemoryContext;
     168              : 
     169          840 :     return accessor;
     170              : }
     171              : 
     172              : /*
     173              :  * Attach to a SharedTuplestore that has been initialized by another backend,
     174              :  * so that this backend can read and write tuples.
     175              :  */
     176              : SharedTuplestoreAccessor *
     177         1105 : sts_attach(SharedTuplestore *sts,
     178              :            int my_participant_number,
     179              :            SharedFileSet *fileset)
     180              : {
     181              :     SharedTuplestoreAccessor *accessor;
     182              : 
     183              :     Assert(my_participant_number < sts->nparticipants);
     184              : 
     185         1105 :     accessor = palloc0_object(SharedTuplestoreAccessor);
     186         1105 :     accessor->participant = my_participant_number;
     187         1105 :     accessor->sts = sts;
     188         1105 :     accessor->fileset = fileset;
     189         1105 :     accessor->context = CurrentMemoryContext;
     190              : 
     191         1105 :     return accessor;
     192              : }
     193              : 
     194              : static void
     195         1882 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
     196              : {
     197              :     size_t      size;
     198              : 
     199         1882 :     size = STS_CHUNK_PAGES * BLCKSZ;
     200         1882 :     BufFileWrite(accessor->write_file, accessor->write_chunk, size);
     201         1882 :     memset(accessor->write_chunk, 0, size);
     202         1882 :     accessor->write_pointer = &accessor->write_chunk->data[0];
     203         1882 :     accessor->sts->participants[accessor->participant].npages +=
     204              :         STS_CHUNK_PAGES;
     205         1882 : }
     206              : 
     207              : /*
     208              :  * Finish writing tuples.  This must be called by all backends that have
     209              :  * written data before any backend begins reading it.
     210              :  */
     211              : void
     212         3349 : sts_end_write(SharedTuplestoreAccessor *accessor)
     213              : {
     214         3349 :     if (accessor->write_file != NULL)
     215              :     {
     216         1174 :         sts_flush_chunk(accessor);
     217         1174 :         BufFileClose(accessor->write_file);
     218         1174 :         pfree(accessor->write_chunk);
     219         1174 :         accessor->write_chunk = NULL;
     220         1174 :         accessor->write_file = NULL;
     221         1174 :         accessor->sts->participants[accessor->participant].writing = false;
     222              :     }
     223         3349 : }
     224              : 
     225              : /*
     226              :  * Prepare to rescan.  Only one participant must call this.  After it returns,
     227              :  * all participants may call sts_begin_parallel_scan() and then loop over
     228              :  * sts_parallel_scan_next().  This function must not be called concurrently
     229              :  * with a scan, and synchronization to avoid that is the caller's
     230              :  * responsibility.
     231              :  */
     232              : void
     233            0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
     234              : {
     235              :     int         i;
     236              : 
     237              :     /*
     238              :      * Reset the shared read head for all participants' files.  Also set the
     239              :      * initial chunk size to the minimum (any increases from that size will be
     240              :      * recorded in chunk_expansion_log).
     241              :      */
     242            0 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     243              :     {
     244            0 :         accessor->sts->participants[i].read_page = 0;
     245              :     }
     246            0 : }
     247              : 
     248              : /*
     249              :  * Begin scanning the contents in parallel.
     250              :  */
     251              : void
     252          817 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
     253              : {
     254              :     int         i PG_USED_FOR_ASSERTS_ONLY;
     255              : 
     256              :     /* End any existing scan that was in progress. */
     257          817 :     sts_end_parallel_scan(accessor);
     258              : 
     259              :     /*
     260              :      * Any backend that might have written into this shared tuplestore must
     261              :      * have called sts_end_write(), so that all buffers are flushed and the
     262              :      * files have stopped growing.
     263              :      */
     264         3082 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     265              :         Assert(!accessor->sts->participants[i].writing);
     266              : 
     267              :     /*
     268              :      * We will start out reading the file that THIS backend wrote.  There may
     269              :      * be some caching locality advantage to that.
     270              :      */
     271          817 :     accessor->read_participant = accessor->participant;
     272          817 :     accessor->read_file = NULL;
     273          817 :     accessor->read_next_page = 0;
     274          817 : }
     275              : 
     276              : /*
     277              :  * Finish a parallel scan, freeing associated backend-local resources.
     278              :  */
     279              : void
     280         4023 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
     281              : {
     282              :     /*
     283              :      * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
     284              :      * we'd probably need a reference count of current parallel scanners so we
     285              :      * could safely do it only when the reference count reaches zero.
     286              :      */
     287         4023 :     if (accessor->read_file != NULL)
     288              :     {
     289            0 :         BufFileClose(accessor->read_file);
     290            0 :         accessor->read_file = NULL;
     291              :     }
     292         4023 : }
     293              : 
     294              : /*
     295              :  * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
     296              :  * pointer to meta data of that size must be provided.
     297              :  */
     298              : void
     299      1217929 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
     300              :              MinimalTuple tuple)
     301              : {
     302              :     size_t      size;
     303              : 
     304              :     /* Do we have our own file yet? */
     305      1217929 :     if (accessor->write_file == NULL)
     306              :     {
     307              :         SharedTuplestoreParticipant *participant;
     308              :         char        name[MAXPGPATH];
     309              :         MemoryContext oldcxt;
     310              : 
     311              :         /* Create one.  Only this backend will write into it. */
     312         1174 :         sts_filename(name, accessor, accessor->participant);
     313              : 
     314         1174 :         oldcxt = MemoryContextSwitchTo(accessor->context);
     315         1174 :         accessor->write_file =
     316         1174 :             BufFileCreateFileSet(&accessor->fileset->fs, name);
     317         1174 :         MemoryContextSwitchTo(oldcxt);
     318              : 
     319              :         /* Set up the shared state for this backend's file. */
     320         1174 :         participant = &accessor->sts->participants[accessor->participant];
     321         1174 :         participant->writing = true; /* for assertions only */
     322              :     }
     323              : 
     324              :     /* Do we have space? */
     325      1217929 :     size = accessor->sts->meta_data_size + tuple->t_len;
     326      1217929 :     if (accessor->write_pointer == NULL ||
     327      1216755 :         accessor->write_pointer + size > accessor->write_end)
     328              :     {
     329         1774 :         if (accessor->write_chunk == NULL)
     330              :         {
     331              :             /* First time through.  Allocate chunk. */
     332         1174 :             accessor->write_chunk = (SharedTuplestoreChunk *)
     333         1174 :                 MemoryContextAllocZero(accessor->context,
     334              :                                        STS_CHUNK_PAGES * BLCKSZ);
     335         1174 :             accessor->write_chunk->ntuples = 0;
     336         1174 :             accessor->write_pointer = &accessor->write_chunk->data[0];
     337         1174 :             accessor->write_end = (char *)
     338         1174 :                 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
     339              :         }
     340              :         else
     341              :         {
     342              :             /* See if flushing helps. */
     343          600 :             sts_flush_chunk(accessor);
     344              :         }
     345              : 
     346              :         /* It may still not be enough in the case of a gigantic tuple. */
     347         1774 :         if (accessor->write_pointer + size > accessor->write_end)
     348              :         {
     349              :             size_t      written;
     350              : 
     351              :             /*
     352              :              * We'll write the beginning of the oversized tuple, and then
     353              :              * write the rest in some number of 'overflow' chunks.
     354              :              *
     355              :              * sts_initialize() verifies that the size of the tuple +
     356              :              * meta-data always fits into a chunk. Because the chunk has been
     357              :              * flushed above, we can be sure to have all of a chunk's usable
     358              :              * space available.
     359              :              */
     360              :             Assert(accessor->write_pointer + accessor->sts->meta_data_size +
     361              :                    sizeof(uint32) < accessor->write_end);
     362              : 
     363              :             /* Write the meta-data as one chunk. */
     364           12 :             if (accessor->sts->meta_data_size > 0)
     365           12 :                 memcpy(accessor->write_pointer, meta_data,
     366           12 :                        accessor->sts->meta_data_size);
     367              : 
     368              :             /*
     369              :              * Write as much of the tuple as we can fit. This includes the
     370              :              * tuple's size at the start.
     371              :              */
     372           12 :             written = accessor->write_end - accessor->write_pointer -
     373           12 :                 accessor->sts->meta_data_size;
     374           12 :             memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
     375              :                    tuple, written);
     376           12 :             ++accessor->write_chunk->ntuples;
     377           12 :             size -= accessor->sts->meta_data_size;
     378           12 :             size -= written;
     379              :             /* Now write as many overflow chunks as we need for the rest. */
     380          120 :             while (size > 0)
     381              :             {
     382              :                 size_t      written_this_chunk;
     383              : 
     384          108 :                 sts_flush_chunk(accessor);
     385              : 
     386              :                 /*
     387              :                  * How many overflow chunks to go?  This will allow readers to
     388              :                  * skip all of them at once instead of reading each one.
     389              :                  */
     390          108 :                 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
     391              :                     STS_CHUNK_DATA_SIZE;
     392          108 :                 written_this_chunk =
     393          108 :                     Min(accessor->write_end - accessor->write_pointer, size);
     394          108 :                 memcpy(accessor->write_pointer, (char *) tuple + written,
     395              :                        written_this_chunk);
     396          108 :                 accessor->write_pointer += written_this_chunk;
     397          108 :                 size -= written_this_chunk;
     398          108 :                 written += written_this_chunk;
     399              :             }
     400           12 :             return;
     401              :         }
     402              :     }
     403              : 
     404              :     /* Copy meta-data and tuple into buffer. */
     405      1217917 :     if (accessor->sts->meta_data_size > 0)
     406      1217917 :         memcpy(accessor->write_pointer, meta_data,
     407      1217917 :                accessor->sts->meta_data_size);
     408      1217917 :     memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
     409      1217917 :            tuple->t_len);
     410      1217917 :     accessor->write_pointer += size;
     411      1217917 :     ++accessor->write_chunk->ntuples;
     412              : }
     413              : 
     414              : static MinimalTuple
     415      1217929 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
     416              : {
     417              :     MinimalTuple tuple;
     418              :     uint32      size;
     419              :     size_t      remaining_size;
     420              :     size_t      this_chunk_size;
     421              :     char       *destination;
     422              : 
     423              :     /*
     424              :      * We'll keep track of bytes read from this chunk so that we can detect an
     425              :      * overflowing tuple and switch to reading overflow pages.
     426              :      */
     427      1217929 :     if (accessor->sts->meta_data_size > 0)
     428              :     {
     429      1217929 :         BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
     430      1217929 :         accessor->read_bytes += accessor->sts->meta_data_size;
     431              :     }
     432      1217929 :     BufFileReadExact(accessor->read_file, &size, sizeof(size));
     433      1217929 :     accessor->read_bytes += sizeof(size);
     434      1217929 :     if (size > accessor->read_buffer_size)
     435              :     {
     436              :         size_t      new_read_buffer_size;
     437              : 
     438          620 :         if (accessor->read_buffer != NULL)
     439            0 :             pfree(accessor->read_buffer);
     440          620 :         new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
     441          620 :         accessor->read_buffer =
     442          620 :             MemoryContextAlloc(accessor->context, new_read_buffer_size);
     443          620 :         accessor->read_buffer_size = new_read_buffer_size;
     444              :     }
     445      1217929 :     remaining_size = size - sizeof(uint32);
     446      1217929 :     this_chunk_size = Min(remaining_size,
     447              :                           BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
     448      1217929 :     destination = accessor->read_buffer + sizeof(uint32);
     449      1217929 :     BufFileReadExact(accessor->read_file, destination, this_chunk_size);
     450      1217929 :     accessor->read_bytes += this_chunk_size;
     451      1217929 :     remaining_size -= this_chunk_size;
     452      1217929 :     destination += this_chunk_size;
     453      1217929 :     ++accessor->read_ntuples;
     454              : 
     455              :     /* Check if we need to read any overflow chunks. */
     456      1218037 :     while (remaining_size > 0)
     457              :     {
     458              :         /* We are now positioned at the start of an overflow chunk. */
     459              :         SharedTuplestoreChunk chunk_header;
     460              : 
     461          108 :         BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
     462          108 :         accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     463          108 :         if (chunk_header.overflow == 0)
     464            0 :             ereport(ERROR,
     465              :                     (errcode_for_file_access(),
     466              :                      errmsg("unexpected chunk in shared tuplestore temporary file"),
     467              :                      errdetail_internal("Expected overflow chunk.")));
     468          108 :         accessor->read_next_page += STS_CHUNK_PAGES;
     469          108 :         this_chunk_size = Min(remaining_size,
     470              :                               BLCKSZ * STS_CHUNK_PAGES -
     471              :                               STS_CHUNK_HEADER_SIZE);
     472          108 :         BufFileReadExact(accessor->read_file, destination, this_chunk_size);
     473          108 :         accessor->read_bytes += this_chunk_size;
     474          108 :         remaining_size -= this_chunk_size;
     475          108 :         destination += this_chunk_size;
     476              : 
     477              :         /*
     478              :          * These will be used to count regular tuples following the oversized
     479              :          * tuple that spilled into this overflow chunk.
     480              :          */
     481          108 :         accessor->read_ntuples = 0;
     482          108 :         accessor->read_ntuples_available = chunk_header.ntuples;
     483              :     }
     484              : 
     485      1217929 :     tuple = (MinimalTuple) accessor->read_buffer;
     486      1217929 :     tuple->t_len = size;
     487              : 
     488      1217929 :     return tuple;
     489              : }
     490              : 
     491              : /*
     492              :  * Get the next tuple in the current parallel scan.
     493              :  */
     494              : MinimalTuple
     495      1218660 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
     496              : {
     497              :     SharedTuplestoreParticipant *p;
     498              :     BlockNumber read_page;
     499              :     bool        eof;
     500              : 
     501              :     for (;;)
     502              :     {
     503              :         /* Can we read more tuples from the current chunk? */
     504      1221714 :         if (accessor->read_ntuples < accessor->read_ntuples_available)
     505      1217929 :             return sts_read_tuple(accessor, meta_data);
     506              : 
     507              :         /* Find the location of a new chunk to read. */
     508         3785 :         p = &accessor->sts->participants[accessor->read_participant];
     509              : 
     510         3785 :         LWLockAcquire(&p->lock, LW_EXCLUSIVE);
     511              :         /* We can skip directly past overflow pages we know about. */
     512         3785 :         if (p->read_page < accessor->read_next_page)
     513           12 :             p->read_page = accessor->read_next_page;
     514         3785 :         eof = p->read_page >= p->npages;
     515         3785 :         if (!eof)
     516              :         {
     517              :             /* Claim the next chunk. */
     518         1775 :             read_page = p->read_page;
     519              :             /* Advance the read head for the next reader. */
     520         1775 :             p->read_page += STS_CHUNK_PAGES;
     521         1775 :             accessor->read_next_page = p->read_page;
     522              :         }
     523         3785 :         LWLockRelease(&p->lock);
     524              : 
     525         3785 :         if (!eof)
     526              :         {
     527              :             SharedTuplestoreChunk chunk_header;
     528              : 
     529              :             /* Make sure we have the file open. */
     530         1775 :             if (accessor->read_file == NULL)
     531              :             {
     532              :                 char        name[MAXPGPATH];
     533              :                 MemoryContext oldcxt;
     534              : 
     535         1206 :                 sts_filename(name, accessor, accessor->read_participant);
     536              : 
     537         1206 :                 oldcxt = MemoryContextSwitchTo(accessor->context);
     538         1206 :                 accessor->read_file =
     539         1206 :                     BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
     540              :                                        false);
     541         1206 :                 MemoryContextSwitchTo(oldcxt);
     542              :             }
     543              : 
     544              :             /* Seek and load the chunk header. */
     545         1775 :             if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
     546            0 :                 ereport(ERROR,
     547              :                         (errcode_for_file_access(),
     548              :                          errmsg("could not seek to block %u in shared tuplestore temporary file",
     549              :                                 read_page)));
     550         1775 :             BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
     551              : 
     552              :             /*
     553              :              * If this is an overflow chunk, we skip it and any following
     554              :              * overflow chunks all at once.
     555              :              */
     556         1775 :             if (chunk_header.overflow > 0)
     557              :             {
     558            1 :                 accessor->read_next_page = read_page +
     559            1 :                     chunk_header.overflow * STS_CHUNK_PAGES;
     560            1 :                 continue;
     561              :             }
     562              : 
     563         1774 :             accessor->read_ntuples = 0;
     564         1774 :             accessor->read_ntuples_available = chunk_header.ntuples;
     565         1774 :             accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     566              : 
     567              :             /* Go around again, so we can get a tuple from this chunk. */
     568              :         }
     569              :         else
     570              :         {
     571         2010 :             if (accessor->read_file != NULL)
     572              :             {
     573         1206 :                 BufFileClose(accessor->read_file);
     574         1206 :                 accessor->read_file = NULL;
     575              :             }
     576              : 
     577              :             /*
     578              :              * Try the next participant's file.  If we've gone full circle,
     579              :              * we're done.
     580              :              */
     581         2010 :             accessor->read_participant = (accessor->read_participant + 1) %
     582         2010 :                 accessor->sts->nparticipants;
     583         2010 :             if (accessor->read_participant == accessor->participant)
     584          731 :                 break;
     585         1279 :             accessor->read_next_page = 0;
     586              : 
     587              :             /* Go around again, so we can get a chunk from this file. */
     588              :         }
     589              :     }
     590              : 
     591          731 :     return NULL;
     592              : }
     593              : 
     594              : /*
     595              :  * Create the name used for the BufFile that a given participant will write.
     596              :  */
     597              : static void
     598         2380 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
     599              : {
     600         2380 :     snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
     601         2380 : }
        

Generated by: LCOV version 2.0-1