LCOV - code coverage report
Current view: top level - src/backend/utils/sort - sharedtuplestore.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 173 187 92.5 %
Date: 2024-03-29 14:11:41 Functions: 11 12 91.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * sharedtuplestore.c
       4             :  *    Simple mechanism for sharing tuples between backends.
       5             :  *
       6             :  * This module contains a shared temporary tuple storage mechanism providing
       7             :  * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
       8             :  * can write to a SharedTuplestore, and then multiple backends can later scan
       9             :  * the stored tuples.  Currently, the only scan type supported is a parallel
      10             :  * scan where each backend reads an arbitrary subset of the tuples that were
      11             :  * written.
      12             :  *
      13             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      14             :  * Portions Copyright (c) 1994, Regents of the University of California
      15             :  *
      16             :  * IDENTIFICATION
      17             :  *    src/backend/utils/sort/sharedtuplestore.c
      18             :  *
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : 
      22             : #include "postgres.h"
      23             : 
      24             : #include "access/htup.h"
      25             : #include "access/htup_details.h"
      26             : #include "storage/buffile.h"
      27             : #include "storage/lwlock.h"
      28             : #include "storage/sharedfileset.h"
      29             : #include "utils/sharedtuplestore.h"
      30             : 
      31             : /*
      32             :  * The size of chunks, in pages.  This is somewhat arbitrarily set to match
      33             :  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
      34             :  * at approximately the same rate as it allocates new chunks of memory to
      35             :  * insert them into.
      36             :  */
      37             : #define STS_CHUNK_PAGES 4
      38             : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
      39             : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
      40             : 
      41             : /* Chunk written to disk. */
      42             : typedef struct SharedTuplestoreChunk
      43             : {
      44             :     int         ntuples;        /* Number of tuples in this chunk. */
      45             :     int         overflow;       /* If overflow, how many including this one? */
      46             :     char        data[FLEXIBLE_ARRAY_MEMBER];
      47             : } SharedTuplestoreChunk;
      48             : 
      49             : /* Per-participant shared state. */
      50             : typedef struct SharedTuplestoreParticipant
      51             : {
      52             :     LWLock      lock;
      53             :     BlockNumber read_page;      /* Page number for next read. */
      54             :     BlockNumber npages;         /* Number of pages written. */
      55             :     bool        writing;        /* Used only for assertions. */
      56             : } SharedTuplestoreParticipant;
      57             : 
      58             : /* The control object that lives in shared memory. */
      59             : struct SharedTuplestore
      60             : {
      61             :     int         nparticipants;  /* Number of participants that can write. */
      62             :     int         flags;          /* Flag bits from SHARED_TUPLESTORE_XXX */
      63             :     size_t      meta_data_size; /* Size of per-tuple header. */
      64             :     char        name[NAMEDATALEN];  /* A name for this tuplestore. */
      65             : 
      66             :     /* Followed by per-participant shared state. */
      67             :     SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
      68             : };
      69             : 
      70             : /* Per-participant state that lives in backend-local memory. */
      71             : struct SharedTuplestoreAccessor
      72             : {
      73             :     int         participant;    /* My participant number. */
      74             :     SharedTuplestore *sts;      /* The shared state. */
      75             :     SharedFileSet *fileset;     /* The SharedFileSet holding files. */
      76             :     MemoryContext context;      /* Memory context for buffers. */
      77             : 
      78             :     /* State for reading. */
      79             :     int         read_participant;   /* The current participant to read from. */
      80             :     BufFile    *read_file;      /* The current file to read from. */
      81             :     int         read_ntuples_available; /* The number of tuples in chunk. */
      82             :     int         read_ntuples;   /* How many tuples have we read from chunk? */
      83             :     size_t      read_bytes;     /* How many bytes have we read from chunk? */
      84             :     char       *read_buffer;    /* A buffer for loading tuples. */
      85             :     size_t      read_buffer_size;
      86             :     BlockNumber read_next_page; /* Lowest block we'll consider reading. */
      87             : 
      88             :     /* State for writing. */
      89             :     SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
      90             :     BufFile    *write_file;     /* The current file to write to. */
      91             :     BlockNumber write_page;     /* The next page to write to. */
      92             :     char       *write_pointer;  /* Current write pointer within chunk. */
      93             :     char       *write_end;      /* One past the end of the current chunk. */
      94             : };
      95             : 
      96             : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
      97             :                          int participant);
      98             : 
      99             : /*
     100             :  * Return the amount of shared memory required to hold SharedTuplestore for a
     101             :  * given number of participants.
     102             :  */
     103             : size_t
     104        3960 : sts_estimate(int participants)
     105             : {
     106        7920 :     return offsetof(SharedTuplestore, participants) +
     107        3960 :         sizeof(SharedTuplestoreParticipant) * participants;
     108             : }
     109             : 
     110             : /*
     111             :  * Initialize a SharedTuplestore in existing shared memory.  There must be
     112             :  * space for sts_estimate(participants) bytes.  If flags includes the value
     113             :  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
     114             :  * eagerly (but this isn't yet implemented).
     115             :  *
     116             :  * Tuples that are stored may optionally carry a piece of fixed sized
     117             :  * meta-data which will be retrieved along with the tuple.  This is useful for
     118             :  * the hash values used in multi-batch hash joins, but could have other
     119             :  * applications.
     120             :  *
     121             :  * The caller must supply a SharedFileSet, which is essentially a directory
     122             :  * that will be cleaned up automatically, and a name which must be unique
     123             :  * across all SharedTuplestores created in the same SharedFileSet.
     124             :  */
     125             : SharedTuplestoreAccessor *
     126        1668 : sts_initialize(SharedTuplestore *sts, int participants,
     127             :                int my_participant_number,
     128             :                size_t meta_data_size,
     129             :                int flags,
     130             :                SharedFileSet *fileset,
     131             :                const char *name)
     132             : {
     133             :     SharedTuplestoreAccessor *accessor;
     134             :     int         i;
     135             : 
     136             :     Assert(my_participant_number < participants);
     137             : 
     138        1668 :     sts->nparticipants = participants;
     139        1668 :     sts->meta_data_size = meta_data_size;
     140        1668 :     sts->flags = flags;
     141             : 
     142        1668 :     if (strlen(name) > sizeof(sts->name) - 1)
     143           0 :         elog(ERROR, "SharedTuplestore name too long");
     144        1668 :     strcpy(sts->name, name);
     145             : 
     146             :     /*
     147             :      * Limit meta-data so it + tuple size always fits into a single chunk.
     148             :      * sts_puttuple() and sts_read_tuple() could be made to support scenarios
     149             :      * where that's not the case, but it's not currently required. If so,
     150             :      * meta-data size probably should be made variable, too.
     151             :      */
     152        1668 :     if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
     153           0 :         elog(ERROR, "meta-data too long");
     154             : 
     155        6232 :     for (i = 0; i < participants; ++i)
     156             :     {
     157        4564 :         LWLockInitialize(&sts->participants[i].lock,
     158             :                          LWTRANCHE_SHARED_TUPLESTORE);
     159        4564 :         sts->participants[i].read_page = 0;
     160        4564 :         sts->participants[i].npages = 0;
     161        4564 :         sts->participants[i].writing = false;
     162             :     }
     163             : 
     164        1668 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     165        1668 :     accessor->participant = my_participant_number;
     166        1668 :     accessor->sts = sts;
     167        1668 :     accessor->fileset = fileset;
     168        1668 :     accessor->context = CurrentMemoryContext;
     169             : 
     170        1668 :     return accessor;
     171             : }
     172             : 
     173             : /*
     174             :  * Attach to a SharedTuplestore that has been initialized by another backend,
     175             :  * so that this backend can read and write tuples.
     176             :  */
     177             : SharedTuplestoreAccessor *
     178        2076 : sts_attach(SharedTuplestore *sts,
     179             :            int my_participant_number,
     180             :            SharedFileSet *fileset)
     181             : {
     182             :     SharedTuplestoreAccessor *accessor;
     183             : 
     184             :     Assert(my_participant_number < sts->nparticipants);
     185             : 
     186        2076 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     187        2076 :     accessor->participant = my_participant_number;
     188        2076 :     accessor->sts = sts;
     189        2076 :     accessor->fileset = fileset;
     190        2076 :     accessor->context = CurrentMemoryContext;
     191             : 
     192        2076 :     return accessor;
     193             : }
     194             : 
     195             : static void
     196        3326 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
     197             : {
     198             :     size_t      size;
     199             : 
     200        3326 :     size = STS_CHUNK_PAGES * BLCKSZ;
     201        3326 :     BufFileWrite(accessor->write_file, accessor->write_chunk, size);
     202        3326 :     memset(accessor->write_chunk, 0, size);
     203        3326 :     accessor->write_pointer = &accessor->write_chunk->data[0];
     204        3326 :     accessor->sts->participants[accessor->participant].npages +=
     205             :         STS_CHUNK_PAGES;
     206        3326 : }
     207             : 
     208             : /*
     209             :  * Finish writing tuples.  This must be called by all backends that have
     210             :  * written data before any backend begins reading it.
     211             :  */
     212             : void
     213        5864 : sts_end_write(SharedTuplestoreAccessor *accessor)
     214             : {
     215        5864 :     if (accessor->write_file != NULL)
     216             :     {
     217        1712 :         sts_flush_chunk(accessor);
     218        1712 :         BufFileClose(accessor->write_file);
     219        1712 :         pfree(accessor->write_chunk);
     220        1712 :         accessor->write_chunk = NULL;
     221        1712 :         accessor->write_file = NULL;
     222        1712 :         accessor->sts->participants[accessor->participant].writing = false;
     223             :     }
     224        5864 : }
     225             : 
     226             : /*
     227             :  * Prepare to rescan.  Only one participant must call this.  After it returns,
     228             :  * all participants may call sts_begin_parallel_scan() and then loop over
     229             :  * sts_parallel_scan_next().  This function must not be called concurrently
     230             :  * with a scan, and synchronization to avoid that is the caller's
     231             :  * responsibility.
     232             :  */
     233             : void
     234           0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
     235             : {
     236             :     int         i;
     237             : 
     238             :     /*
     239             :      * Reset the shared read head for all participants' files.  Also set the
     240             :      * initial chunk size to the minimum (any increases from that size will be
     241             :      * recorded in chunk_expansion_log).
     242             :      */
     243           0 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     244             :     {
     245           0 :         accessor->sts->participants[i].read_page = 0;
     246             :     }
     247           0 : }
     248             : 
     249             : /*
     250             :  * Begin scanning the contents in parallel.
     251             :  */
     252             : void
     253        1502 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
     254             : {
     255             :     int         i PG_USED_FOR_ASSERTS_ONLY;
     256             : 
     257             :     /* End any existing scan that was in progress. */
     258        1502 :     sts_end_parallel_scan(accessor);
     259             : 
     260             :     /*
     261             :      * Any backend that might have written into this shared tuplestore must
     262             :      * have called sts_end_write(), so that all buffers are flushed and the
     263             :      * files have stopped growing.
     264             :      */
     265        5678 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     266             :         Assert(!accessor->sts->participants[i].writing);
     267             : 
     268             :     /*
     269             :      * We will start out reading the file that THIS backend wrote.  There may
     270             :      * be some caching locality advantage to that.
     271             :      */
     272        1502 :     accessor->read_participant = accessor->participant;
     273        1502 :     accessor->read_file = NULL;
     274        1502 :     accessor->read_next_page = 0;
     275        1502 : }
     276             : 
     277             : /*
     278             :  * Finish a parallel scan, freeing associated backend-local resources.
     279             :  */
     280             : void
     281        7582 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
     282             : {
     283             :     /*
     284             :      * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
     285             :      * we'd probably need a reference count of current parallel scanners so we
     286             :      * could safely do it only when the reference count reaches zero.
     287             :      */
     288        7582 :     if (accessor->read_file != NULL)
     289             :     {
     290           0 :         BufFileClose(accessor->read_file);
     291           0 :         accessor->read_file = NULL;
     292             :     }
     293        7582 : }
     294             : 
     295             : /*
     296             :  * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
     297             :  * pointer to meta data of that size must be provided.
     298             :  */
     299             : void
     300     2444316 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
     301             :              MinimalTuple tuple)
     302             : {
     303             :     size_t      size;
     304             : 
     305             :     /* Do we have our own file yet? */
     306     2444316 :     if (accessor->write_file == NULL)
     307             :     {
     308             :         SharedTuplestoreParticipant *participant;
     309             :         char        name[MAXPGPATH];
     310             :         MemoryContext oldcxt;
     311             : 
     312             :         /* Create one.  Only this backend will write into it. */
     313        1712 :         sts_filename(name, accessor, accessor->participant);
     314             : 
     315        1712 :         oldcxt = MemoryContextSwitchTo(accessor->context);
     316        1712 :         accessor->write_file =
     317        1712 :             BufFileCreateFileSet(&accessor->fileset->fs, name);
     318        1712 :         MemoryContextSwitchTo(oldcxt);
     319             : 
     320             :         /* Set up the shared state for this backend's file. */
     321        1712 :         participant = &accessor->sts->participants[accessor->participant];
     322        1712 :         participant->writing = true; /* for assertions only */
     323             :     }
     324             : 
     325             :     /* Do we have space? */
     326     2444316 :     size = accessor->sts->meta_data_size + tuple->t_len;
     327     2444316 :     if (accessor->write_pointer + size > accessor->write_end)
     328             :     {
     329        3110 :         if (accessor->write_chunk == NULL)
     330             :         {
     331             :             /* First time through.  Allocate chunk. */
     332        1712 :             accessor->write_chunk = (SharedTuplestoreChunk *)
     333        1712 :                 MemoryContextAllocZero(accessor->context,
     334             :                                        STS_CHUNK_PAGES * BLCKSZ);
     335        1712 :             accessor->write_chunk->ntuples = 0;
     336        1712 :             accessor->write_pointer = &accessor->write_chunk->data[0];
     337        1712 :             accessor->write_end = (char *)
     338        1712 :                 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
     339             :         }
     340             :         else
     341             :         {
     342             :             /* See if flushing helps. */
     343        1398 :             sts_flush_chunk(accessor);
     344             :         }
     345             : 
     346             :         /* It may still not be enough in the case of a gigantic tuple. */
     347        3110 :         if (accessor->write_pointer + size > accessor->write_end)
     348             :         {
     349             :             size_t      written;
     350             : 
     351             :             /*
     352             :              * We'll write the beginning of the oversized tuple, and then
     353             :              * write the rest in some number of 'overflow' chunks.
     354             :              *
     355             :              * sts_initialize() verifies that the size of the tuple +
     356             :              * meta-data always fits into a chunk. Because the chunk has been
     357             :              * flushed above, we can be sure to have all of a chunk's usable
     358             :              * space available.
     359             :              */
     360             :             Assert(accessor->write_pointer + accessor->sts->meta_data_size +
     361             :                    sizeof(uint32) < accessor->write_end);
     362             : 
     363             :             /* Write the meta-data as one chunk. */
     364          24 :             if (accessor->sts->meta_data_size > 0)
     365          24 :                 memcpy(accessor->write_pointer, meta_data,
     366          24 :                        accessor->sts->meta_data_size);
     367             : 
     368             :             /*
     369             :              * Write as much of the tuple as we can fit. This includes the
     370             :              * tuple's size at the start.
     371             :              */
     372          24 :             written = accessor->write_end - accessor->write_pointer -
     373          24 :                 accessor->sts->meta_data_size;
     374          24 :             memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
     375             :                    tuple, written);
     376          24 :             ++accessor->write_chunk->ntuples;
     377          24 :             size -= accessor->sts->meta_data_size;
     378          24 :             size -= written;
     379             :             /* Now write as many overflow chunks as we need for the rest. */
     380         240 :             while (size > 0)
     381             :             {
     382             :                 size_t      written_this_chunk;
     383             : 
     384         216 :                 sts_flush_chunk(accessor);
     385             : 
     386             :                 /*
     387             :                  * How many overflow chunks to go?  This will allow readers to
     388             :                  * skip all of them at once instead of reading each one.
     389             :                  */
     390         216 :                 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
     391             :                     STS_CHUNK_DATA_SIZE;
     392         216 :                 written_this_chunk =
     393         216 :                     Min(accessor->write_end - accessor->write_pointer, size);
     394         216 :                 memcpy(accessor->write_pointer, (char *) tuple + written,
     395             :                        written_this_chunk);
     396         216 :                 accessor->write_pointer += written_this_chunk;
     397         216 :                 size -= written_this_chunk;
     398         216 :                 written += written_this_chunk;
     399             :             }
     400          24 :             return;
     401             :         }
     402             :     }
     403             : 
     404             :     /* Copy meta-data and tuple into buffer. */
     405     2444292 :     if (accessor->sts->meta_data_size > 0)
     406     2444292 :         memcpy(accessor->write_pointer, meta_data,
     407     2444292 :                accessor->sts->meta_data_size);
     408     2444292 :     memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
     409     2444292 :            tuple->t_len);
     410     2444292 :     accessor->write_pointer += size;
     411     2444292 :     ++accessor->write_chunk->ntuples;
     412             : }
     413             : 
     414             : static MinimalTuple
     415     2444316 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
     416             : {
     417             :     MinimalTuple tuple;
     418             :     uint32      size;
     419             :     size_t      remaining_size;
     420             :     size_t      this_chunk_size;
     421             :     char       *destination;
     422             : 
     423             :     /*
     424             :      * We'll keep track of bytes read from this chunk so that we can detect an
     425             :      * overflowing tuple and switch to reading overflow pages.
     426             :      */
     427     2444316 :     if (accessor->sts->meta_data_size > 0)
     428             :     {
     429     2444316 :         BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
     430     2444316 :         accessor->read_bytes += accessor->sts->meta_data_size;
     431             :     }
     432     2444316 :     BufFileReadExact(accessor->read_file, &size, sizeof(size));
     433     2444316 :     accessor->read_bytes += sizeof(size);
     434     2444316 :     if (size > accessor->read_buffer_size)
     435             :     {
     436             :         size_t      new_read_buffer_size;
     437             : 
     438        1196 :         if (accessor->read_buffer != NULL)
     439           0 :             pfree(accessor->read_buffer);
     440        1196 :         new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
     441        1196 :         accessor->read_buffer =
     442        1196 :             MemoryContextAlloc(accessor->context, new_read_buffer_size);
     443        1196 :         accessor->read_buffer_size = new_read_buffer_size;
     444             :     }
     445     2444316 :     remaining_size = size - sizeof(uint32);
     446     2444316 :     this_chunk_size = Min(remaining_size,
     447             :                           BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
     448     2444316 :     destination = accessor->read_buffer + sizeof(uint32);
     449     2444316 :     BufFileReadExact(accessor->read_file, destination, this_chunk_size);
     450     2444316 :     accessor->read_bytes += this_chunk_size;
     451     2444316 :     remaining_size -= this_chunk_size;
     452     2444316 :     destination += this_chunk_size;
     453     2444316 :     ++accessor->read_ntuples;
     454             : 
     455             :     /* Check if we need to read any overflow chunks. */
     456     2444532 :     while (remaining_size > 0)
     457             :     {
     458             :         /* We are now positioned at the start of an overflow chunk. */
     459             :         SharedTuplestoreChunk chunk_header;
     460             : 
     461         216 :         BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
     462         216 :         accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     463         216 :         if (chunk_header.overflow == 0)
     464           0 :             ereport(ERROR,
     465             :                     (errcode_for_file_access(),
     466             :                      errmsg("unexpected chunk in shared tuplestore temporary file"),
     467             :                      errdetail_internal("Expected overflow chunk.")));
     468         216 :         accessor->read_next_page += STS_CHUNK_PAGES;
     469         216 :         this_chunk_size = Min(remaining_size,
     470             :                               BLCKSZ * STS_CHUNK_PAGES -
     471             :                               STS_CHUNK_HEADER_SIZE);
     472         216 :         BufFileReadExact(accessor->read_file, destination, this_chunk_size);
     473         216 :         accessor->read_bytes += this_chunk_size;
     474         216 :         remaining_size -= this_chunk_size;
     475         216 :         destination += this_chunk_size;
     476             : 
     477             :         /*
     478             :          * These will be used to count regular tuples following the oversized
     479             :          * tuple that spilled into this overflow chunk.
     480             :          */
     481         216 :         accessor->read_ntuples = 0;
     482         216 :         accessor->read_ntuples_available = chunk_header.ntuples;
     483             :     }
     484             : 
     485     2444316 :     tuple = (MinimalTuple) accessor->read_buffer;
     486     2444316 :     tuple->t_len = size;
     487             : 
     488     2444316 :     return tuple;
     489             : }
     490             : 
     491             : /*
     492             :  * Get the next tuple in the current parallel scan.
     493             :  */
     494             : MinimalTuple
     495     2451222 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
     496             : {
     497             :     SharedTuplestoreParticipant *p;
     498             :     BlockNumber read_page;
     499             :     bool        eof;
     500             : 
     501             :     for (;;)
     502             :     {
     503             :         /* Can we read more tuples from the current chunk? */
     504     2451222 :         if (accessor->read_ntuples < accessor->read_ntuples_available)
     505     2444316 :             return sts_read_tuple(accessor, meta_data);
     506             : 
     507             :         /* Find the location of a new chunk to read. */
     508        6906 :         p = &accessor->sts->participants[accessor->read_participant];
     509             : 
     510        6906 :         LWLockAcquire(&p->lock, LW_EXCLUSIVE);
     511             :         /* We can skip directly past overflow pages we know about. */
     512        6906 :         if (p->read_page < accessor->read_next_page)
     513          24 :             p->read_page = accessor->read_next_page;
     514        6906 :         eof = p->read_page >= p->npages;
     515        6906 :         if (!eof)
     516             :         {
     517             :             /* Claim the next chunk. */
     518        3110 :             read_page = p->read_page;
     519             :             /* Advance the read head for the next reader. */
     520        3110 :             p->read_page += STS_CHUNK_PAGES;
     521        3110 :             accessor->read_next_page = p->read_page;
     522             :         }
     523        6906 :         LWLockRelease(&p->lock);
     524             : 
     525        6906 :         if (!eof)
     526             :         {
     527             :             SharedTuplestoreChunk chunk_header;
     528             : 
     529             :             /* Make sure we have the file open. */
     530        3110 :             if (accessor->read_file == NULL)
     531             :             {
     532             :                 char        name[MAXPGPATH];
     533             :                 MemoryContext oldcxt;
     534             : 
     535        1750 :                 sts_filename(name, accessor, accessor->read_participant);
     536             : 
     537        1750 :                 oldcxt = MemoryContextSwitchTo(accessor->context);
     538        1750 :                 accessor->read_file =
     539        1750 :                     BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
     540             :                                        false);
     541        1750 :                 MemoryContextSwitchTo(oldcxt);
     542             :             }
     543             : 
     544             :             /* Seek and load the chunk header. */
     545        3110 :             if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
     546           0 :                 ereport(ERROR,
     547             :                         (errcode_for_file_access(),
     548             :                          errmsg("could not seek to block %u in shared tuplestore temporary file",
     549             :                                 read_page)));
     550        3110 :             BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
     551             : 
     552             :             /*
     553             :              * If this is an overflow chunk, we skip it and any following
     554             :              * overflow chunks all at once.
     555             :              */
     556        3110 :             if (chunk_header.overflow > 0)
     557             :             {
     558           0 :                 accessor->read_next_page = read_page +
     559           0 :                     chunk_header.overflow * STS_CHUNK_PAGES;
     560           0 :                 continue;
     561             :             }
     562             : 
     563        3110 :             accessor->read_ntuples = 0;
     564        3110 :             accessor->read_ntuples_available = chunk_header.ntuples;
     565        3110 :             accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     566             : 
     567             :             /* Go around again, so we can get a tuple from this chunk. */
     568             :         }
     569             :         else
     570             :         {
     571        3796 :             if (accessor->read_file != NULL)
     572             :             {
     573        1750 :                 BufFileClose(accessor->read_file);
     574        1750 :                 accessor->read_file = NULL;
     575             :             }
     576             : 
     577             :             /*
     578             :              * Try the next participant's file.  If we've gone full circle,
     579             :              * we're done.
     580             :              */
     581        3796 :             accessor->read_participant = (accessor->read_participant + 1) %
     582        3796 :                 accessor->sts->nparticipants;
     583        3796 :             if (accessor->read_participant == accessor->participant)
     584        1374 :                 break;
     585        2422 :             accessor->read_next_page = 0;
     586             : 
     587             :             /* Go around again, so we can get a chunk from this file. */
     588             :         }
     589             :     }
     590             : 
     591        1374 :     return NULL;
     592             : }
     593             : 
     594             : /*
     595             :  * Create the name used for the BufFile that a given participant will write.
     596             :  */
     597             : static void
     598        3462 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
     599             : {
     600        3462 :     snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
     601        3462 : }

Generated by: LCOV version 1.14