LCOV - code coverage report
Current view: top level - src/backend/utils/sort - sharedtuplestore.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 173 187 92.5 %
Date: 2025-12-02 12:17:32 Functions: 11 12 91.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * sharedtuplestore.c
       4             :  *    Simple mechanism for sharing tuples between backends.
       5             :  *
       6             :  * This module contains a shared temporary tuple storage mechanism providing
       7             :  * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
       8             :  * can write to a SharedTuplestore, and then multiple backends can later scan
       9             :  * the stored tuples.  Currently, the only scan type supported is a parallel
      10             :  * scan where each backend reads an arbitrary subset of the tuples that were
      11             :  * written.
      12             :  *
      13             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      14             :  * Portions Copyright (c) 1994, Regents of the University of California
      15             :  *
      16             :  * IDENTIFICATION
      17             :  *    src/backend/utils/sort/sharedtuplestore.c
      18             :  *
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : 
      22             : #include "postgres.h"
      23             : 
      24             : #include "access/htup.h"
      25             : #include "access/htup_details.h"
      26             : #include "storage/buffile.h"
      27             : #include "storage/lwlock.h"
      28             : #include "storage/sharedfileset.h"
      29             : #include "utils/sharedtuplestore.h"
      30             : 
      31             : /*
      32             :  * The size of chunks, in pages.  This is somewhat arbitrarily set to match
      33             :  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
      34             :  * at approximately the same rate as it allocates new chunks of memory to
      35             :  * insert them into.
      36             :  */
      37             : #define STS_CHUNK_PAGES 4
      38             : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
      39             : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
      40             : 
      41             : /* Chunk written to disk. */
      42             : typedef struct SharedTuplestoreChunk
      43             : {
      44             :     int         ntuples;        /* Number of tuples in this chunk. */
      45             :     int         overflow;       /* If overflow, how many including this one? */
      46             :     char        data[FLEXIBLE_ARRAY_MEMBER];
      47             : } SharedTuplestoreChunk;
      48             : 
      49             : /* Per-participant shared state. */
      50             : typedef struct SharedTuplestoreParticipant
      51             : {
      52             :     LWLock      lock;
      53             :     BlockNumber read_page;      /* Page number for next read. */
      54             :     BlockNumber npages;         /* Number of pages written. */
      55             :     bool        writing;        /* Used only for assertions. */
      56             : } SharedTuplestoreParticipant;
      57             : 
      58             : /* The control object that lives in shared memory. */
      59             : struct SharedTuplestore
      60             : {
      61             :     int         nparticipants;  /* Number of participants that can write. */
      62             :     int         flags;          /* Flag bits from SHARED_TUPLESTORE_XXX */
      63             :     size_t      meta_data_size; /* Size of per-tuple header. */
      64             :     char        name[NAMEDATALEN];  /* A name for this tuplestore. */
      65             : 
      66             :     /* Followed by per-participant shared state. */
      67             :     SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
      68             : };
      69             : 
      70             : /* Per-participant state that lives in backend-local memory. */
      71             : struct SharedTuplestoreAccessor
      72             : {
      73             :     int         participant;    /* My participant number. */
      74             :     SharedTuplestore *sts;      /* The shared state. */
      75             :     SharedFileSet *fileset;     /* The SharedFileSet holding files. */
      76             :     MemoryContext context;      /* Memory context for buffers. */
      77             : 
      78             :     /* State for reading. */
      79             :     int         read_participant;   /* The current participant to read from. */
      80             :     BufFile    *read_file;      /* The current file to read from. */
      81             :     int         read_ntuples_available; /* The number of tuples in chunk. */
      82             :     int         read_ntuples;   /* How many tuples have we read from chunk? */
      83             :     size_t      read_bytes;     /* How many bytes have we read from chunk? */
      84             :     char       *read_buffer;    /* A buffer for loading tuples. */
      85             :     size_t      read_buffer_size;
      86             :     BlockNumber read_next_page; /* Lowest block we'll consider reading. */
      87             : 
      88             :     /* State for writing. */
      89             :     SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
      90             :     BufFile    *write_file;     /* The current file to write to. */
      91             :     char       *write_pointer;  /* Current write pointer within chunk. */
      92             :     char       *write_end;      /* One past the end of the current chunk. */
      93             : };
      94             : 
      95             : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
      96             :                          int participant);
      97             : 
      98             : /*
      99             :  * Return the amount of shared memory required to hold SharedTuplestore for a
     100             :  * given number of participants.
     101             :  */
     102             : size_t
     103        3752 : sts_estimate(int participants)
     104             : {
     105        7504 :     return offsetof(SharedTuplestore, participants) +
     106        3752 :         sizeof(SharedTuplestoreParticipant) * participants;
     107             : }
     108             : 
     109             : /*
     110             :  * Initialize a SharedTuplestore in existing shared memory.  There must be
     111             :  * space for sts_estimate(participants) bytes.  If flags includes the value
     112             :  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
     113             :  * eagerly (but this isn't yet implemented).
     114             :  *
     115             :  * Tuples that are stored may optionally carry a piece of fixed sized
     116             :  * meta-data which will be retrieved along with the tuple.  This is useful for
     117             :  * the hash values used in multi-batch hash joins, but could have other
     118             :  * applications.
     119             :  *
     120             :  * The caller must supply a SharedFileSet, which is essentially a directory
     121             :  * that will be cleaned up automatically, and a name which must be unique
     122             :  * across all SharedTuplestores created in the same SharedFileSet.
     123             :  */
     124             : SharedTuplestoreAccessor *
     125        1392 : sts_initialize(SharedTuplestore *sts, int participants,
     126             :                int my_participant_number,
     127             :                size_t meta_data_size,
     128             :                int flags,
     129             :                SharedFileSet *fileset,
     130             :                const char *name)
     131             : {
     132             :     SharedTuplestoreAccessor *accessor;
     133             :     int         i;
     134             : 
     135             :     Assert(my_participant_number < participants);
     136             : 
     137        1392 :     sts->nparticipants = participants;
     138        1392 :     sts->meta_data_size = meta_data_size;
     139        1392 :     sts->flags = flags;
     140             : 
     141        1392 :     if (strlen(name) > sizeof(sts->name) - 1)
     142           0 :         elog(ERROR, "SharedTuplestore name too long");
     143        1392 :     strcpy(sts->name, name);
     144             : 
     145             :     /*
     146             :      * Limit meta-data so it + tuple size always fits into a single chunk.
     147             :      * sts_puttuple() and sts_read_tuple() could be made to support scenarios
     148             :      * where that's not the case, but it's not currently required. If so,
     149             :      * meta-data size probably should be made variable, too.
     150             :      */
     151        1392 :     if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
     152           0 :         elog(ERROR, "meta-data too long");
     153             : 
     154        5124 :     for (i = 0; i < participants; ++i)
     155             :     {
     156        3732 :         LWLockInitialize(&sts->participants[i].lock,
     157             :                          LWTRANCHE_SHARED_TUPLESTORE);
     158        3732 :         sts->participants[i].read_page = 0;
     159        3732 :         sts->participants[i].npages = 0;
     160        3732 :         sts->participants[i].writing = false;
     161             :     }
     162             : 
     163        1392 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     164        1392 :     accessor->participant = my_participant_number;
     165        1392 :     accessor->sts = sts;
     166        1392 :     accessor->fileset = fileset;
     167        1392 :     accessor->context = CurrentMemoryContext;
     168             : 
     169        1392 :     return accessor;
     170             : }
     171             : 
     172             : /*
     173             :  * Attach to a SharedTuplestore that has been initialized by another backend,
     174             :  * so that this backend can read and write tuples.
     175             :  */
     176             : SharedTuplestoreAccessor *
     177        1850 : sts_attach(SharedTuplestore *sts,
     178             :            int my_participant_number,
     179             :            SharedFileSet *fileset)
     180             : {
     181             :     SharedTuplestoreAccessor *accessor;
     182             : 
     183             :     Assert(my_participant_number < sts->nparticipants);
     184             : 
     185        1850 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     186        1850 :     accessor->participant = my_participant_number;
     187        1850 :     accessor->sts = sts;
     188        1850 :     accessor->fileset = fileset;
     189        1850 :     accessor->context = CurrentMemoryContext;
     190             : 
     191        1850 :     return accessor;
     192             : }
     193             : 
     194             : static void
     195        3248 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
     196             : {
     197             :     size_t      size;
     198             : 
     199        3248 :     size = STS_CHUNK_PAGES * BLCKSZ;
     200        3248 :     BufFileWrite(accessor->write_file, accessor->write_chunk, size);
     201        3248 :     memset(accessor->write_chunk, 0, size);
     202        3248 :     accessor->write_pointer = &accessor->write_chunk->data[0];
     203        3248 :     accessor->sts->participants[accessor->participant].npages +=
     204             :         STS_CHUNK_PAGES;
     205        3248 : }
     206             : 
     207             : /*
     208             :  * Finish writing tuples.  This must be called by all backends that have
     209             :  * written data before any backend begins reading it.
     210             :  */
     211             : void
     212        5192 : sts_end_write(SharedTuplestoreAccessor *accessor)
     213             : {
     214        5192 :     if (accessor->write_file != NULL)
     215             :     {
     216        1544 :         sts_flush_chunk(accessor);
     217        1544 :         BufFileClose(accessor->write_file);
     218        1544 :         pfree(accessor->write_chunk);
     219        1544 :         accessor->write_chunk = NULL;
     220        1544 :         accessor->write_file = NULL;
     221        1544 :         accessor->sts->participants[accessor->participant].writing = false;
     222             :     }
     223        5192 : }
     224             : 
     225             : /*
     226             :  * Prepare to rescan.  Only one participant must call this.  After it returns,
     227             :  * all participants may call sts_begin_parallel_scan() and then loop over
     228             :  * sts_parallel_scan_next().  This function must not be called concurrently
     229             :  * with a scan, and synchronization to avoid that is the caller's
     230             :  * responsibility.
     231             :  */
     232             : void
     233           0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
     234             : {
     235             :     int         i;
     236             : 
     237             :     /*
     238             :      * Reset the shared read head for all participants' files.  Also set the
     239             :      * initial chunk size to the minimum (any increases from that size will be
     240             :      * recorded in chunk_expansion_log).
     241             :      */
     242           0 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     243             :     {
     244           0 :         accessor->sts->participants[i].read_page = 0;
     245             :     }
     246           0 : }
     247             : 
     248             : /*
     249             :  * Begin scanning the contents in parallel.
     250             :  */
     251             : void
     252        1312 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
     253             : {
     254             :     int         i PG_USED_FOR_ASSERTS_ONLY;
     255             : 
     256             :     /* End any existing scan that was in progress. */
     257        1312 :     sts_end_parallel_scan(accessor);
     258             : 
     259             :     /*
     260             :      * Any backend that might have written into this shared tuplestore must
     261             :      * have called sts_end_write(), so that all buffers are flushed and the
     262             :      * files have stopped growing.
     263             :      */
     264        4902 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     265             :         Assert(!accessor->sts->participants[i].writing);
     266             : 
     267             :     /*
     268             :      * We will start out reading the file that THIS backend wrote.  There may
     269             :      * be some caching locality advantage to that.
     270             :      */
     271        1312 :     accessor->read_participant = accessor->participant;
     272        1312 :     accessor->read_file = NULL;
     273        1312 :     accessor->read_next_page = 0;
     274        1312 : }
     275             : 
     276             : /*
     277             :  * Finish a parallel scan, freeing associated backend-local resources.
     278             :  */
     279             : void
     280        6642 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
     281             : {
     282             :     /*
     283             :      * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
     284             :      * we'd probably need a reference count of current parallel scanners so we
     285             :      * could safely do it only when the reference count reaches zero.
     286             :      */
     287        6642 :     if (accessor->read_file != NULL)
     288             :     {
     289           0 :         BufFileClose(accessor->read_file);
     290           0 :         accessor->read_file = NULL;
     291             :     }
     292        6642 : }
     293             : 
     294             : /*
     295             :  * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
     296             :  * pointer to meta data of that size must be provided.
     297             :  */
     298             : void
     299     2400798 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
     300             :              MinimalTuple tuple)
     301             : {
     302             :     size_t      size;
     303             : 
     304             :     /* Do we have our own file yet? */
     305     2400798 :     if (accessor->write_file == NULL)
     306             :     {
     307             :         SharedTuplestoreParticipant *participant;
     308             :         char        name[MAXPGPATH];
     309             :         MemoryContext oldcxt;
     310             : 
     311             :         /* Create one.  Only this backend will write into it. */
     312        1544 :         sts_filename(name, accessor, accessor->participant);
     313             : 
     314        1544 :         oldcxt = MemoryContextSwitchTo(accessor->context);
     315        1544 :         accessor->write_file =
     316        1544 :             BufFileCreateFileSet(&accessor->fileset->fs, name);
     317        1544 :         MemoryContextSwitchTo(oldcxt);
     318             : 
     319             :         /* Set up the shared state for this backend's file. */
     320        1544 :         participant = &accessor->sts->participants[accessor->participant];
     321        1544 :         participant->writing = true; /* for assertions only */
     322             :     }
     323             : 
     324             :     /* Do we have space? */
     325     2400798 :     size = accessor->sts->meta_data_size + tuple->t_len;
     326     2400798 :     if (accessor->write_pointer + size > accessor->write_end)
     327             :     {
     328        3032 :         if (accessor->write_chunk == NULL)
     329             :         {
     330             :             /* First time through.  Allocate chunk. */
     331        1544 :             accessor->write_chunk = (SharedTuplestoreChunk *)
     332        1544 :                 MemoryContextAllocZero(accessor->context,
     333             :                                        STS_CHUNK_PAGES * BLCKSZ);
     334        1544 :             accessor->write_chunk->ntuples = 0;
     335        1544 :             accessor->write_pointer = &accessor->write_chunk->data[0];
     336        1544 :             accessor->write_end = (char *)
     337        1544 :                 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
     338             :         }
     339             :         else
     340             :         {
     341             :             /* See if flushing helps. */
     342        1488 :             sts_flush_chunk(accessor);
     343             :         }
     344             : 
     345             :         /* It may still not be enough in the case of a gigantic tuple. */
     346        3032 :         if (accessor->write_pointer + size > accessor->write_end)
     347             :         {
     348             :             size_t      written;
     349             : 
     350             :             /*
     351             :              * We'll write the beginning of the oversized tuple, and then
     352             :              * write the rest in some number of 'overflow' chunks.
     353             :              *
     354             :              * sts_initialize() verifies that the size of the tuple +
     355             :              * meta-data always fits into a chunk. Because the chunk has been
     356             :              * flushed above, we can be sure to have all of a chunk's usable
     357             :              * space available.
     358             :              */
     359             :             Assert(accessor->write_pointer + accessor->sts->meta_data_size +
     360             :                    sizeof(uint32) < accessor->write_end);
     361             : 
     362             :             /* Write the meta-data as one chunk. */
     363          24 :             if (accessor->sts->meta_data_size > 0)
     364          24 :                 memcpy(accessor->write_pointer, meta_data,
     365          24 :                        accessor->sts->meta_data_size);
     366             : 
     367             :             /*
     368             :              * Write as much of the tuple as we can fit. This includes the
     369             :              * tuple's size at the start.
     370             :              */
     371          24 :             written = accessor->write_end - accessor->write_pointer -
     372          24 :                 accessor->sts->meta_data_size;
     373          24 :             memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
     374             :                    tuple, written);
     375          24 :             ++accessor->write_chunk->ntuples;
     376          24 :             size -= accessor->sts->meta_data_size;
     377          24 :             size -= written;
     378             :             /* Now write as many overflow chunks as we need for the rest. */
     379         240 :             while (size > 0)
     380             :             {
     381             :                 size_t      written_this_chunk;
     382             : 
     383         216 :                 sts_flush_chunk(accessor);
     384             : 
     385             :                 /*
     386             :                  * How many overflow chunks to go?  This will allow readers to
     387             :                  * skip all of them at once instead of reading each one.
     388             :                  */
     389         216 :                 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
     390             :                     STS_CHUNK_DATA_SIZE;
     391         216 :                 written_this_chunk =
     392         216 :                     Min(accessor->write_end - accessor->write_pointer, size);
     393         216 :                 memcpy(accessor->write_pointer, (char *) tuple + written,
     394             :                        written_this_chunk);
     395         216 :                 accessor->write_pointer += written_this_chunk;
     396         216 :                 size -= written_this_chunk;
     397         216 :                 written += written_this_chunk;
     398             :             }
     399          24 :             return;
     400             :         }
     401             :     }
     402             : 
     403             :     /* Copy meta-data and tuple into buffer. */
     404     2400774 :     if (accessor->sts->meta_data_size > 0)
     405     2400774 :         memcpy(accessor->write_pointer, meta_data,
     406     2400774 :                accessor->sts->meta_data_size);
     407     2400774 :     memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
     408     2400774 :            tuple->t_len);
     409     2400774 :     accessor->write_pointer += size;
     410     2400774 :     ++accessor->write_chunk->ntuples;
     411             : }
     412             : 
     413             : static MinimalTuple
     414     2400798 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
     415             : {
     416             :     MinimalTuple tuple;
     417             :     uint32      size;
     418             :     size_t      remaining_size;
     419             :     size_t      this_chunk_size;
     420             :     char       *destination;
     421             : 
     422             :     /*
     423             :      * We'll keep track of bytes read from this chunk so that we can detect an
     424             :      * overflowing tuple and switch to reading overflow pages.
     425             :      */
     426     2400798 :     if (accessor->sts->meta_data_size > 0)
     427             :     {
     428     2400798 :         BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
     429     2400798 :         accessor->read_bytes += accessor->sts->meta_data_size;
     430             :     }
     431     2400798 :     BufFileReadExact(accessor->read_file, &size, sizeof(size));
     432     2400798 :     accessor->read_bytes += sizeof(size);
     433     2400798 :     if (size > accessor->read_buffer_size)
     434             :     {
     435             :         size_t      new_read_buffer_size;
     436             : 
     437         946 :         if (accessor->read_buffer != NULL)
     438           0 :             pfree(accessor->read_buffer);
     439         946 :         new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
     440         946 :         accessor->read_buffer =
     441         946 :             MemoryContextAlloc(accessor->context, new_read_buffer_size);
     442         946 :         accessor->read_buffer_size = new_read_buffer_size;
     443             :     }
     444     2400798 :     remaining_size = size - sizeof(uint32);
     445     2400798 :     this_chunk_size = Min(remaining_size,
     446             :                           BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
     447     2400798 :     destination = accessor->read_buffer + sizeof(uint32);
     448     2400798 :     BufFileReadExact(accessor->read_file, destination, this_chunk_size);
     449     2400798 :     accessor->read_bytes += this_chunk_size;
     450     2400798 :     remaining_size -= this_chunk_size;
     451     2400798 :     destination += this_chunk_size;
     452     2400798 :     ++accessor->read_ntuples;
     453             : 
     454             :     /* Check if we need to read any overflow chunks. */
     455     2401014 :     while (remaining_size > 0)
     456             :     {
     457             :         /* We are now positioned at the start of an overflow chunk. */
     458             :         SharedTuplestoreChunk chunk_header;
     459             : 
     460         216 :         BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
     461         216 :         accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     462         216 :         if (chunk_header.overflow == 0)
     463           0 :             ereport(ERROR,
     464             :                     (errcode_for_file_access(),
     465             :                      errmsg("unexpected chunk in shared tuplestore temporary file"),
     466             :                      errdetail_internal("Expected overflow chunk.")));
     467         216 :         accessor->read_next_page += STS_CHUNK_PAGES;
     468         216 :         this_chunk_size = Min(remaining_size,
     469             :                               BLCKSZ * STS_CHUNK_PAGES -
     470             :                               STS_CHUNK_HEADER_SIZE);
     471         216 :         BufFileReadExact(accessor->read_file, destination, this_chunk_size);
     472         216 :         accessor->read_bytes += this_chunk_size;
     473         216 :         remaining_size -= this_chunk_size;
     474         216 :         destination += this_chunk_size;
     475             : 
     476             :         /*
     477             :          * These will be used to count regular tuples following the oversized
     478             :          * tuple that spilled into this overflow chunk.
     479             :          */
     480         216 :         accessor->read_ntuples = 0;
     481         216 :         accessor->read_ntuples_available = chunk_header.ntuples;
     482             :     }
     483             : 
     484     2400798 :     tuple = (MinimalTuple) accessor->read_buffer;
     485     2400798 :     tuple->t_len = size;
     486             : 
     487     2400798 :     return tuple;
     488             : }
     489             : 
     490             : /*
     491             :  * Get the next tuple in the current parallel scan.
     492             :  */
     493             : MinimalTuple
     494     2401944 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
     495             : {
     496             :     SharedTuplestoreParticipant *p;
     497             :     BlockNumber read_page;
     498             :     bool        eof;
     499             : 
     500             :     for (;;)
     501             :     {
     502             :         /* Can we read more tuples from the current chunk? */
     503     2406928 :         if (accessor->read_ntuples < accessor->read_ntuples_available)
     504     2400798 :             return sts_read_tuple(accessor, meta_data);
     505             : 
     506             :         /* Find the location of a new chunk to read. */
     507        6130 :         p = &accessor->sts->participants[accessor->read_participant];
     508             : 
     509        6130 :         LWLockAcquire(&p->lock, LW_EXCLUSIVE);
     510             :         /* We can skip directly past overflow pages we know about. */
     511        6130 :         if (p->read_page < accessor->read_next_page)
     512          24 :             p->read_page = accessor->read_next_page;
     513        6130 :         eof = p->read_page >= p->npages;
     514        6130 :         if (!eof)
     515             :         {
     516             :             /* Claim the next chunk. */
     517        3032 :             read_page = p->read_page;
     518             :             /* Advance the read head for the next reader. */
     519        3032 :             p->read_page += STS_CHUNK_PAGES;
     520        3032 :             accessor->read_next_page = p->read_page;
     521             :         }
     522        6130 :         LWLockRelease(&p->lock);
     523             : 
     524        6130 :         if (!eof)
     525             :         {
     526             :             SharedTuplestoreChunk chunk_header;
     527             : 
     528             :             /* Make sure we have the file open. */
     529        3032 :             if (accessor->read_file == NULL)
     530             :             {
     531             :                 char        name[MAXPGPATH];
     532             :                 MemoryContext oldcxt;
     533             : 
     534        1620 :                 sts_filename(name, accessor, accessor->read_participant);
     535             : 
     536        1620 :                 oldcxt = MemoryContextSwitchTo(accessor->context);
     537        1620 :                 accessor->read_file =
     538        1620 :                     BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
     539             :                                        false);
     540        1620 :                 MemoryContextSwitchTo(oldcxt);
     541             :             }
     542             : 
     543             :             /* Seek and load the chunk header. */
     544        3032 :             if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
     545           0 :                 ereport(ERROR,
     546             :                         (errcode_for_file_access(),
     547             :                          errmsg("could not seek to block %u in shared tuplestore temporary file",
     548             :                                 read_page)));
     549        3032 :             BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
     550             : 
     551             :             /*
     552             :              * If this is an overflow chunk, we skip it and any following
     553             :              * overflow chunks all at once.
     554             :              */
     555        3032 :             if (chunk_header.overflow > 0)
     556             :             {
     557           0 :                 accessor->read_next_page = read_page +
     558           0 :                     chunk_header.overflow * STS_CHUNK_PAGES;
     559           0 :                 continue;
     560             :             }
     561             : 
     562        3032 :             accessor->read_ntuples = 0;
     563        3032 :             accessor->read_ntuples_available = chunk_header.ntuples;
     564        3032 :             accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     565             : 
     566             :             /* Go around again, so we can get a tuple from this chunk. */
     567             :         }
     568             :         else
     569             :         {
     570        3098 :             if (accessor->read_file != NULL)
     571             :             {
     572        1620 :                 BufFileClose(accessor->read_file);
     573        1620 :                 accessor->read_file = NULL;
     574             :             }
     575             : 
     576             :             /*
     577             :              * Try the next participant's file.  If we've gone full circle,
     578             :              * we're done.
     579             :              */
     580        3098 :             accessor->read_participant = (accessor->read_participant + 1) %
     581        3098 :                 accessor->sts->nparticipants;
     582        3098 :             if (accessor->read_participant == accessor->participant)
     583        1146 :                 break;
     584        1952 :             accessor->read_next_page = 0;
     585             : 
     586             :             /* Go around again, so we can get a chunk from this file. */
     587             :         }
     588             :     }
     589             : 
     590        1146 :     return NULL;
     591             : }
     592             : 
     593             : /*
     594             :  * Create the name used for the BufFile that a given participant will write.
     595             :  */
     596             : static void
     597        3164 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
     598             : {
     599        3164 :     snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
     600        3164 : }

Generated by: LCOV version 1.16