LCOV - code coverage report
Current view: top level - src/backend/utils/sort - sharedtuplestore.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 171 191 89.5 %
Date: 2021-12-05 01:09:12 Functions: 11 12 91.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * sharedtuplestore.c
       4             :  *    Simple mechanism for sharing tuples between backends.
       5             :  *
       6             :  * This module contains a shared temporary tuple storage mechanism providing
       7             :  * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
       8             :  * can write to a SharedTuplestore, and then multiple backends can later scan
       9             :  * the stored tuples.  Currently, the only scan type supported is a parallel
      10             :  * scan where each backend reads an arbitrary subset of the tuples that were
      11             :  * written.
      12             :  *
      13             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
      14             :  * Portions Copyright (c) 1994, Regents of the University of California
      15             :  *
      16             :  * IDENTIFICATION
      17             :  *    src/backend/utils/sort/sharedtuplestore.c
      18             :  *
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : 
      22             : #include "postgres.h"
      23             : 
      24             : #include "access/htup.h"
      25             : #include "access/htup_details.h"
      26             : #include "miscadmin.h"
      27             : #include "storage/buffile.h"
      28             : #include "storage/lwlock.h"
      29             : #include "storage/sharedfileset.h"
      30             : #include "utils/sharedtuplestore.h"
      31             : 
      32             : /*
      33             :  * The size of chunks, in pages.  This is somewhat arbitrarily set to match
      34             :  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
      35             :  * at approximately the same rate as it allocates new chunks of memory to
      36             :  * insert them into.
      37             :  */
      38             : #define STS_CHUNK_PAGES 4
      39             : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
      40             : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
      41             : 
      42             : /* Chunk written to disk. */
      43             : typedef struct SharedTuplestoreChunk
      44             : {
      45             :     int         ntuples;        /* Number of tuples in this chunk. */
      46             :     int         overflow;       /* If overflow, how many including this one? */
      47             :     char        data[FLEXIBLE_ARRAY_MEMBER];
      48             : } SharedTuplestoreChunk;
      49             : 
      50             : /* Per-participant shared state. */
      51             : typedef struct SharedTuplestoreParticipant
      52             : {
      53             :     LWLock      lock;
      54             :     BlockNumber read_page;      /* Page number for next read. */
      55             :     BlockNumber npages;         /* Number of pages written. */
      56             :     bool        writing;        /* Used only for assertions. */
      57             : } SharedTuplestoreParticipant;
      58             : 
      59             : /* The control object that lives in shared memory. */
      60             : struct SharedTuplestore
      61             : {
      62             :     int         nparticipants;  /* Number of participants that can write. */
      63             :     int         flags;          /* Flag bits from SHARED_TUPLESTORE_XXX */
      64             :     size_t      meta_data_size; /* Size of per-tuple header. */
      65             :     char        name[NAMEDATALEN];  /* A name for this tuplestore. */
      66             : 
      67             :     /* Followed by per-participant shared state. */
      68             :     SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
      69             : };
      70             : 
      71             : /* Per-participant state that lives in backend-local memory. */
      72             : struct SharedTuplestoreAccessor
      73             : {
      74             :     int         participant;    /* My participant number. */
      75             :     SharedTuplestore *sts;      /* The shared state. */
      76             :     SharedFileSet *fileset;     /* The SharedFileSet holding files. */
      77             :     MemoryContext context;      /* Memory context for buffers. */
      78             : 
      79             :     /* State for reading. */
      80             :     int         read_participant;   /* The current participant to read from. */
      81             :     BufFile    *read_file;      /* The current file to read from. */
      82             :     int         read_ntuples_available; /* The number of tuples in chunk. */
      83             :     int         read_ntuples;   /* How many tuples have we read from chunk? */
      84             :     size_t      read_bytes;     /* How many bytes have we read from chunk? */
      85             :     char       *read_buffer;    /* A buffer for loading tuples. */
      86             :     size_t      read_buffer_size;
      87             :     BlockNumber read_next_page; /* Lowest block we'll consider reading. */
      88             : 
      89             :     /* State for writing. */
      90             :     SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
      91             :     BufFile    *write_file;     /* The current file to write to. */
      92             :     BlockNumber write_page;     /* The next page to write to. */
      93             :     char       *write_pointer;  /* Current write pointer within chunk. */
      94             :     char       *write_end;      /* One past the end of the current chunk. */
      95             : };
      96             : 
      97             : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
      98             :                          int participant);
      99             : 
     100             : /*
     101             :  * Return the amount of shared memory required to hold SharedTuplestore for a
     102             :  * given number of participants.
     103             :  */
     104             : size_t
     105        2504 : sts_estimate(int participants)
     106             : {
     107        5008 :     return offsetof(SharedTuplestore, participants) +
     108        2504 :         sizeof(SharedTuplestoreParticipant) * participants;
     109             : }
     110             : 
     111             : /*
     112             :  * Initialize a SharedTuplestore in existing shared memory.  There must be
     113             :  * space for sts_estimate(participants) bytes.  If flags includes the value
     114             :  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
     115             :  * eagerly (but this isn't yet implemented).
     116             :  *
     117             :  * Tuples that are stored may optionally carry a piece of fixed sized
     118             :  * meta-data which will be retrieved along with the tuple.  This is useful for
     119             :  * the hash values used in multi-batch hash joins, but could have other
     120             :  * applications.
     121             :  *
     122             :  * The caller must supply a SharedFileSet, which is essentially a directory
     123             :  * that will be cleaned up automatically, and a name which must be unique
     124             :  * across all SharedTuplestores created in the same SharedFileSet.
     125             :  */
     126             : SharedTuplestoreAccessor *
     127        1088 : sts_initialize(SharedTuplestore *sts, int participants,
     128             :                int my_participant_number,
     129             :                size_t meta_data_size,
     130             :                int flags,
     131             :                SharedFileSet *fileset,
     132             :                const char *name)
     133             : {
     134             :     SharedTuplestoreAccessor *accessor;
     135             :     int         i;
     136             : 
     137             :     Assert(my_participant_number < participants);
     138             : 
     139        1088 :     sts->nparticipants = participants;
     140        1088 :     sts->meta_data_size = meta_data_size;
     141        1088 :     sts->flags = flags;
     142             : 
     143        1088 :     if (strlen(name) > sizeof(sts->name) - 1)
     144           0 :         elog(ERROR, "SharedTuplestore name too long");
     145        1088 :     strcpy(sts->name, name);
     146             : 
     147             :     /*
     148             :      * Limit meta-data so it + tuple size always fits into a single chunk.
     149             :      * sts_puttuple() and sts_read_tuple() could be made to support scenarios
     150             :      * where that's not the case, but it's not currently required. If so,
     151             :      * meta-data size probably should be made variable, too.
     152             :      */
     153        1088 :     if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
     154           0 :         elog(ERROR, "meta-data too long");
     155             : 
     156        4056 :     for (i = 0; i < participants; ++i)
     157             :     {
     158        2968 :         LWLockInitialize(&sts->participants[i].lock,
     159             :                          LWTRANCHE_SHARED_TUPLESTORE);
     160        2968 :         sts->participants[i].read_page = 0;
     161        2968 :         sts->participants[i].writing = false;
     162             :     }
     163             : 
     164        1088 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     165        1088 :     accessor->participant = my_participant_number;
     166        1088 :     accessor->sts = sts;
     167        1088 :     accessor->fileset = fileset;
     168        1088 :     accessor->context = CurrentMemoryContext;
     169             : 
     170        1088 :     return accessor;
     171             : }
     172             : 
     173             : /*
     174             :  * Attach to a SharedTuplestore that has been initialized by another backend,
     175             :  * so that this backend can read and write tuples.
     176             :  */
     177             : SharedTuplestoreAccessor *
     178        1286 : sts_attach(SharedTuplestore *sts,
     179             :            int my_participant_number,
     180             :            SharedFileSet *fileset)
     181             : {
     182             :     SharedTuplestoreAccessor *accessor;
     183             : 
     184             :     Assert(my_participant_number < sts->nparticipants);
     185             : 
     186        1286 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     187        1286 :     accessor->participant = my_participant_number;
     188        1286 :     accessor->sts = sts;
     189        1286 :     accessor->fileset = fileset;
     190        1286 :     accessor->context = CurrentMemoryContext;
     191             : 
     192        1286 :     return accessor;
     193             : }
     194             : 
     195             : static void
     196        2126 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
     197             : {
     198             :     size_t      size;
     199             : 
     200        2126 :     size = STS_CHUNK_PAGES * BLCKSZ;
     201        2126 :     BufFileWrite(accessor->write_file, accessor->write_chunk, size);
     202        2126 :     memset(accessor->write_chunk, 0, size);
     203        2126 :     accessor->write_pointer = &accessor->write_chunk->data[0];
     204        2126 :     accessor->sts->participants[accessor->participant].npages +=
     205             :         STS_CHUNK_PAGES;
     206        2126 : }
     207             : 
     208             : /*
     209             :  * Finish writing tuples.  This must be called by all backends that have
     210             :  * written data before any backend begins reading it.
     211             :  */
     212             : void
     213        3682 : sts_end_write(SharedTuplestoreAccessor *accessor)
     214             : {
     215        3682 :     if (accessor->write_file != NULL)
     216             :     {
     217        1086 :         sts_flush_chunk(accessor);
     218        1086 :         BufFileClose(accessor->write_file);
     219        1086 :         pfree(accessor->write_chunk);
     220        1086 :         accessor->write_chunk = NULL;
     221        1086 :         accessor->write_file = NULL;
     222        1086 :         accessor->sts->participants[accessor->participant].writing = false;
     223             :     }
     224        3682 : }
     225             : 
     226             : /*
     227             :  * Prepare to rescan.  Only one participant must call this.  After it returns,
     228             :  * all participants may call sts_begin_parallel_scan() and then loop over
     229             :  * sts_parallel_scan_next().  This function must not be called concurrently
     230             :  * with a scan, and synchronization to avoid that is the caller's
     231             :  * responsibility.
     232             :  */
     233             : void
     234           0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
     235             : {
     236             :     int         i;
     237             : 
     238             :     /*
     239             :      * Reset the shared read head for all participants' files.  Also set the
     240             :      * initial chunk size to the minimum (any increases from that size will be
     241             :      * recorded in chunk_expansion_log).
     242             :      */
     243           0 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     244             :     {
     245           0 :         accessor->sts->participants[i].read_page = 0;
     246             :     }
     247           0 : }
     248             : 
     249             : /*
     250             :  * Begin scanning the contents in parallel.
     251             :  */
     252             : void
     253         978 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
     254             : {
     255             :     int         i PG_USED_FOR_ASSERTS_ONLY;
     256             : 
     257             :     /* End any existing scan that was in progress. */
     258         978 :     sts_end_parallel_scan(accessor);
     259             : 
     260             :     /*
     261             :      * Any backend that might have written into this shared tuplestore must
     262             :      * have called sts_end_write(), so that all buffers are flushed and the
     263             :      * files have stopped growing.
     264             :      */
     265        3686 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     266             :         Assert(!accessor->sts->participants[i].writing);
     267             : 
     268             :     /*
     269             :      * We will start out reading the file that THIS backend wrote.  There may
     270             :      * be some caching locality advantage to that.
     271             :      */
     272         978 :     accessor->read_participant = accessor->participant;
     273         978 :     accessor->read_file = NULL;
     274         978 :     accessor->read_next_page = 0;
     275         978 : }
     276             : 
     277             : /*
     278             :  * Finish a parallel scan, freeing associated backend-local resources.
     279             :  */
     280             : void
     281        4848 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
     282             : {
     283             :     /*
     284             :      * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
     285             :      * we'd probably need a reference count of current parallel scanners so we
     286             :      * could safely do it only when the reference count reaches zero.
     287             :      */
     288        4848 :     if (accessor->read_file != NULL)
     289             :     {
     290           0 :         BufFileClose(accessor->read_file);
     291           0 :         accessor->read_file = NULL;
     292             :     }
     293        4848 : }
     294             : 
     295             : /*
     296             :  * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
     297             :  * pointer to meta data of that size must be provided.
     298             :  */
     299             : void
     300     1498312 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
     301             :              MinimalTuple tuple)
     302             : {
     303             :     size_t      size;
     304             : 
     305             :     /* Do we have our own file yet? */
     306     1498312 :     if (accessor->write_file == NULL)
     307             :     {
     308             :         SharedTuplestoreParticipant *participant;
     309             :         char        name[MAXPGPATH];
     310             : 
     311             :         /* Create one.  Only this backend will write into it. */
     312        1086 :         sts_filename(name, accessor, accessor->participant);
     313        1086 :         accessor->write_file =
     314        1086 :             BufFileCreateFileSet(&accessor->fileset->fs, name);
     315             : 
     316             :         /* Set up the shared state for this backend's file. */
     317        1086 :         participant = &accessor->sts->participants[accessor->participant];
     318        1086 :         participant->writing = true; /* for assertions only */
     319             :     }
     320             : 
     321             :     /* Do we have space? */
     322     1498312 :     size = accessor->sts->meta_data_size + tuple->t_len;
     323     1498312 :     if (accessor->write_pointer + size >= accessor->write_end)
     324             :     {
     325        1982 :         if (accessor->write_chunk == NULL)
     326             :         {
     327             :             /* First time through.  Allocate chunk. */
     328        1086 :             accessor->write_chunk = (SharedTuplestoreChunk *)
     329        1086 :                 MemoryContextAllocZero(accessor->context,
     330             :                                        STS_CHUNK_PAGES * BLCKSZ);
     331        1086 :             accessor->write_chunk->ntuples = 0;
     332        1086 :             accessor->write_pointer = &accessor->write_chunk->data[0];
     333        1086 :             accessor->write_end = (char *)
     334        1086 :                 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
     335             :         }
     336             :         else
     337             :         {
     338             :             /* See if flushing helps. */
     339         896 :             sts_flush_chunk(accessor);
     340             :         }
     341             : 
     342             :         /* It may still not be enough in the case of a gigantic tuple. */
     343        1982 :         if (accessor->write_pointer + size >= accessor->write_end)
     344             :         {
     345             :             size_t      written;
     346             : 
     347             :             /*
     348             :              * We'll write the beginning of the oversized tuple, and then
     349             :              * write the rest in some number of 'overflow' chunks.
     350             :              *
     351             :              * sts_initialize() verifies that the size of the tuple +
     352             :              * meta-data always fits into a chunk. Because the chunk has been
     353             :              * flushed above, we can be sure to have all of a chunk's usable
     354             :              * space available.
     355             :              */
     356             :             Assert(accessor->write_pointer + accessor->sts->meta_data_size +
     357             :                    sizeof(uint32) < accessor->write_end);
     358             : 
     359             :             /* Write the meta-data as one chunk. */
     360          16 :             if (accessor->sts->meta_data_size > 0)
     361          16 :                 memcpy(accessor->write_pointer, meta_data,
     362          16 :                        accessor->sts->meta_data_size);
     363             : 
     364             :             /*
     365             :              * Write as much of the tuple as we can fit. This includes the
     366             :              * tuple's size at the start.
     367             :              */
     368          16 :             written = accessor->write_end - accessor->write_pointer -
     369          16 :                 accessor->sts->meta_data_size;
     370          16 :             memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
     371             :                    tuple, written);
     372          16 :             ++accessor->write_chunk->ntuples;
     373          16 :             size -= accessor->sts->meta_data_size;
     374          16 :             size -= written;
     375             :             /* Now write as many overflow chunks as we need for the rest. */
     376         160 :             while (size > 0)
     377             :             {
     378             :                 size_t      written_this_chunk;
     379             : 
     380         144 :                 sts_flush_chunk(accessor);
     381             : 
     382             :                 /*
     383             :                  * How many overflow chunks to go?  This will allow readers to
     384             :                  * skip all of them at once instead of reading each one.
     385             :                  */
     386         144 :                 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
     387             :                     STS_CHUNK_DATA_SIZE;
     388         144 :                 written_this_chunk =
     389         144 :                     Min(accessor->write_end - accessor->write_pointer, size);
     390         144 :                 memcpy(accessor->write_pointer, (char *) tuple + written,
     391             :                        written_this_chunk);
     392         144 :                 accessor->write_pointer += written_this_chunk;
     393         144 :                 size -= written_this_chunk;
     394         144 :                 written += written_this_chunk;
     395             :             }
     396          16 :             return;
     397             :         }
     398             :     }
     399             : 
     400             :     /* Copy meta-data and tuple into buffer. */
     401     1498296 :     if (accessor->sts->meta_data_size > 0)
     402     1498296 :         memcpy(accessor->write_pointer, meta_data,
     403     1498296 :                accessor->sts->meta_data_size);
     404     1498296 :     memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
     405     1498296 :            tuple->t_len);
     406     1498296 :     accessor->write_pointer += size;
     407     1498296 :     ++accessor->write_chunk->ntuples;
     408             : }
     409             : 
     410             : static MinimalTuple
     411     1498312 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
     412             : {
     413             :     MinimalTuple tuple;
     414             :     uint32      size;
     415             :     size_t      remaining_size;
     416             :     size_t      this_chunk_size;
     417             :     char       *destination;
     418             : 
     419             :     /*
     420             :      * We'll keep track of bytes read from this chunk so that we can detect an
     421             :      * overflowing tuple and switch to reading overflow pages.
     422             :      */
     423     1498312 :     if (accessor->sts->meta_data_size > 0)
     424             :     {
     425     2996624 :         if (BufFileRead(accessor->read_file,
     426             :                         meta_data,
     427     1498312 :                         accessor->sts->meta_data_size) !=
     428     1498312 :             accessor->sts->meta_data_size)
     429           0 :             ereport(ERROR,
     430             :                     (errcode_for_file_access(),
     431             :                      errmsg("could not read from shared tuplestore temporary file"),
     432             :                      errdetail_internal("Short read while reading meta-data.")));
     433     1498312 :         accessor->read_bytes += accessor->sts->meta_data_size;
     434             :     }
     435     1498312 :     if (BufFileRead(accessor->read_file,
     436             :                     &size,
     437             :                     sizeof(size)) != sizeof(size))
     438           0 :         ereport(ERROR,
     439             :                 (errcode_for_file_access(),
     440             :                  errmsg("could not read from shared tuplestore temporary file"),
     441             :                  errdetail_internal("Short read while reading size.")));
     442     1498312 :     accessor->read_bytes += sizeof(size);
     443     1498312 :     if (size > accessor->read_buffer_size)
     444             :     {
     445             :         size_t      new_read_buffer_size;
     446             : 
     447         772 :         if (accessor->read_buffer != NULL)
     448           0 :             pfree(accessor->read_buffer);
     449         772 :         new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
     450         772 :         accessor->read_buffer =
     451         772 :             MemoryContextAlloc(accessor->context, new_read_buffer_size);
     452         772 :         accessor->read_buffer_size = new_read_buffer_size;
     453             :     }
     454     1498312 :     remaining_size = size - sizeof(uint32);
     455     1498312 :     this_chunk_size = Min(remaining_size,
     456             :                           BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
     457     1498312 :     destination = accessor->read_buffer + sizeof(uint32);
     458     1498312 :     if (BufFileRead(accessor->read_file,
     459             :                     destination,
     460             :                     this_chunk_size) != this_chunk_size)
     461           0 :         ereport(ERROR,
     462             :                 (errcode_for_file_access(),
     463             :                  errmsg("could not read from shared tuplestore temporary file"),
     464             :                  errdetail_internal("Short read while reading tuple.")));
     465     1498312 :     accessor->read_bytes += this_chunk_size;
     466     1498312 :     remaining_size -= this_chunk_size;
     467     1498312 :     destination += this_chunk_size;
     468     1498312 :     ++accessor->read_ntuples;
     469             : 
     470             :     /* Check if we need to read any overflow chunks. */
     471     1498456 :     while (remaining_size > 0)
     472             :     {
     473             :         /* We are now positioned at the start of an overflow chunk. */
     474             :         SharedTuplestoreChunk chunk_header;
     475             : 
     476         144 :         if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
     477             :             STS_CHUNK_HEADER_SIZE)
     478           0 :             ereport(ERROR,
     479             :                     (errcode_for_file_access(),
     480             :                      errmsg("could not read from shared tuplestore temporary file"),
     481             :                      errdetail_internal("Short read while reading overflow chunk header.")));
     482         144 :         accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     483         144 :         if (chunk_header.overflow == 0)
     484           0 :             ereport(ERROR,
     485             :                     (errcode_for_file_access(),
     486             :                      errmsg("unexpected chunk in shared tuplestore temporary file"),
     487             :                      errdetail_internal("Expected overflow chunk.")));
     488         144 :         accessor->read_next_page += STS_CHUNK_PAGES;
     489         144 :         this_chunk_size = Min(remaining_size,
     490             :                               BLCKSZ * STS_CHUNK_PAGES -
     491             :                               STS_CHUNK_HEADER_SIZE);
     492         144 :         if (BufFileRead(accessor->read_file,
     493             :                         destination,
     494             :                         this_chunk_size) != this_chunk_size)
     495           0 :             ereport(ERROR,
     496             :                     (errcode_for_file_access(),
     497             :                      errmsg("could not read from shared tuplestore temporary file"),
     498             :                      errdetail_internal("Short read while reading tuple.")));
     499         144 :         accessor->read_bytes += this_chunk_size;
     500         144 :         remaining_size -= this_chunk_size;
     501         144 :         destination += this_chunk_size;
     502             : 
     503             :         /*
     504             :          * These will be used to count regular tuples following the oversized
     505             :          * tuple that spilled into this overflow chunk.
     506             :          */
     507         144 :         accessor->read_ntuples = 0;
     508         144 :         accessor->read_ntuples_available = chunk_header.ntuples;
     509             :     }
     510             : 
     511     1498312 :     tuple = (MinimalTuple) accessor->read_buffer;
     512     1498312 :     tuple->t_len = size;
     513             : 
     514     1498312 :     return tuple;
     515             : }
     516             : 
     517             : /*
     518             :  * Get the next tuple in the current parallel scan.
     519             :  */
     520             : MinimalTuple
     521     1502790 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
     522             : {
     523             :     SharedTuplestoreParticipant *p;
     524             :     BlockNumber read_page;
     525             :     bool        eof;
     526             : 
     527             :     for (;;)
     528             :     {
     529             :         /* Can we read more tuples from the current chunk? */
     530     1502790 :         if (accessor->read_ntuples < accessor->read_ntuples_available)
     531     1498312 :             return sts_read_tuple(accessor, meta_data);
     532             : 
     533             :         /* Find the location of a new chunk to read. */
     534        4478 :         p = &accessor->sts->participants[accessor->read_participant];
     535             : 
     536        4478 :         LWLockAcquire(&p->lock, LW_EXCLUSIVE);
     537             :         /* We can skip directly past overflow pages we know about. */
     538        4478 :         if (p->read_page < accessor->read_next_page)
     539          16 :             p->read_page = accessor->read_next_page;
     540        4478 :         eof = p->read_page >= p->npages;
     541        4478 :         if (!eof)
     542             :         {
     543             :             /* Claim the next chunk. */
     544        1982 :             read_page = p->read_page;
     545             :             /* Advance the read head for the next reader. */
     546        1982 :             p->read_page += STS_CHUNK_PAGES;
     547        1982 :             accessor->read_next_page = p->read_page;
     548             :         }
     549        4478 :         LWLockRelease(&p->lock);
     550             : 
     551        4478 :         if (!eof)
     552             :         {
     553             :             SharedTuplestoreChunk chunk_header;
     554             :             size_t      nread;
     555             : 
     556             :             /* Make sure we have the file open. */
     557        1982 :             if (accessor->read_file == NULL)
     558             :             {
     559             :                 char        name[MAXPGPATH];
     560             : 
     561        1110 :                 sts_filename(name, accessor, accessor->read_participant);
     562        1110 :                 accessor->read_file =
     563        1110 :                     BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
     564             :                                        false);
     565             :             }
     566             : 
     567             :             /* Seek and load the chunk header. */
     568        1982 :             if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
     569           0 :                 ereport(ERROR,
     570             :                         (errcode_for_file_access(),
     571             :                          errmsg("could not seek to block %u in shared tuplestore temporary file",
     572             :                                 read_page)));
     573        1982 :             nread = BufFileRead(accessor->read_file, &chunk_header,
     574             :                                 STS_CHUNK_HEADER_SIZE);
     575        1982 :             if (nread != STS_CHUNK_HEADER_SIZE)
     576           0 :                 ereport(ERROR,
     577             :                         (errcode_for_file_access(),
     578             :                          errmsg("could not read from shared tuplestore temporary file: read only %zu of %zu bytes",
     579             :                                 nread, STS_CHUNK_HEADER_SIZE)));
     580             : 
     581             :             /*
     582             :              * If this is an overflow chunk, we skip it and any following
     583             :              * overflow chunks all at once.
     584             :              */
     585        1982 :             if (chunk_header.overflow > 0)
     586             :             {
     587           0 :                 accessor->read_next_page = read_page +
     588           0 :                     chunk_header.overflow * STS_CHUNK_PAGES;
     589           0 :                 continue;
     590             :             }
     591             : 
     592        1982 :             accessor->read_ntuples = 0;
     593        1982 :             accessor->read_ntuples_available = chunk_header.ntuples;
     594        1982 :             accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     595             : 
     596             :             /* Go around again, so we can get a tuple from this chunk. */
     597             :         }
     598             :         else
     599             :         {
     600        2496 :             if (accessor->read_file != NULL)
     601             :             {
     602        1110 :                 BufFileClose(accessor->read_file);
     603        1110 :                 accessor->read_file = NULL;
     604             :             }
     605             : 
     606             :             /*
     607             :              * Try the next participant's file.  If we've gone full circle,
     608             :              * we're done.
     609             :              */
     610        2496 :             accessor->read_participant = (accessor->read_participant + 1) %
     611        2496 :                 accessor->sts->nparticipants;
     612        2496 :             if (accessor->read_participant == accessor->participant)
     613         906 :                 break;
     614        1590 :             accessor->read_next_page = 0;
     615             : 
     616             :             /* Go around again, so we can get a chunk from this file. */
     617             :         }
     618             :     }
     619             : 
     620         906 :     return NULL;
     621             : }
     622             : 
     623             : /*
     624             :  * Create the name used for the BufFile that a given participant will write.
     625             :  */
     626             : static void
     627        2196 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
     628             : {
     629        2196 :     snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
     630        2196 : }

Generated by: LCOV version 1.14