LCOV - code coverage report
Current view: top level - src/backend/utils/sort - sharedtuplestore.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 170 191 89.0 %
Date: 2019-11-13 21:06:57 Functions: 11 12 91.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * sharedtuplestore.c
       4             :  *    Simple mechanism for sharing tuples between backends.
       5             :  *
       6             :  * This module contains a shared temporary tuple storage mechanism providing
       7             :  * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
       8             :  * can write to a SharedTuplestore, and then multiple backends can later scan
       9             :  * the stored tuples.  Currently, the only scan type supported is a parallel
      10             :  * scan where each backend reads an arbitrary subset of the tuples that were
      11             :  * written.
      12             :  *
      13             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
      14             :  * Portions Copyright (c) 1994, Regents of the University of California
      15             :  *
      16             :  * IDENTIFICATION
      17             :  *    src/backend/utils/sort/sharedtuplestore.c
      18             :  *
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : 
      22             : #include "postgres.h"
      23             : 
      24             : #include "access/htup.h"
      25             : #include "access/htup_details.h"
      26             : #include "miscadmin.h"
      27             : #include "storage/buffile.h"
      28             : #include "storage/lwlock.h"
      29             : #include "storage/sharedfileset.h"
      30             : #include "utils/sharedtuplestore.h"
      31             : 
      32             : /*
      33             :  * The size of chunks, in pages.  This is somewhat arbitrarily set to match
      34             :  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
      35             :  * at approximately the same rate as it allocates new chunks of memory to
      36             :  * insert them into.
      37             :  */
      38             : #define STS_CHUNK_PAGES 4
      39             : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
      40             : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
      41             : 
      42             : /* Chunk written to disk. */
      43             : typedef struct SharedTuplestoreChunk
      44             : {
      45             :     int         ntuples;        /* Number of tuples in this chunk. */
      46             :     int         overflow;       /* If overflow, how many including this one? */
      47             :     char        data[FLEXIBLE_ARRAY_MEMBER];
      48             : } SharedTuplestoreChunk;
      49             : 
      50             : /* Per-participant shared state. */
      51             : typedef struct SharedTuplestoreParticipant
      52             : {
      53             :     LWLock      lock;
      54             :     BlockNumber read_page;      /* Page number for next read. */
      55             :     BlockNumber npages;         /* Number of pages written. */
      56             :     bool        writing;        /* Used only for assertions. */
      57             : } SharedTuplestoreParticipant;
      58             : 
      59             : /* The control object that lives in shared memory. */
      60             : struct SharedTuplestore
      61             : {
      62             :     int         nparticipants;  /* Number of participants that can write. */
      63             :     int         flags;          /* Flag bits from SHARED_TUPLESTORE_XXX */
      64             :     size_t      meta_data_size; /* Size of per-tuple header. */
      65             :     char        name[NAMEDATALEN];  /* A name for this tuplestore. */
      66             : 
      67             :     /* Followed by per-participant shared state. */
      68             :     SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
      69             : };
      70             : 
      71             : /* Per-participant state that lives in backend-local memory. */
      72             : struct SharedTuplestoreAccessor
      73             : {
      74             :     int         participant;    /* My participant number. */
      75             :     SharedTuplestore *sts;      /* The shared state. */
      76             :     SharedFileSet *fileset;     /* The SharedFileSet holding files. */
      77             :     MemoryContext context;      /* Memory context for buffers. */
      78             : 
      79             :     /* State for reading. */
      80             :     int         read_participant;   /* The current participant to read from. */
      81             :     BufFile    *read_file;      /* The current file to read from. */
      82             :     int         read_ntuples_available; /* The number of tuples in chunk. */
      83             :     int         read_ntuples;   /* How many tuples have we read from chunk? */
      84             :     size_t      read_bytes;     /* How many bytes have we read from chunk? */
      85             :     char       *read_buffer;    /* A buffer for loading tuples. */
      86             :     size_t      read_buffer_size;
      87             :     BlockNumber read_next_page; /* Lowest block we'll consider reading. */
      88             : 
      89             :     /* State for writing. */
      90             :     SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
      91             :     BufFile    *write_file;     /* The current file to write to. */
      92             :     BlockNumber write_page;     /* The next page to write to. */
      93             :     char       *write_pointer;  /* Current write pointer within chunk. */
      94             :     char       *write_end;      /* One past the end of the current chunk. */
      95             : };
      96             : 
      97             : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
      98             :                          int participant);
      99             : 
     100             : /*
     101             :  * Return the amount of shared memory required to hold SharedTuplestore for a
     102             :  * given number of participants.
     103             :  */
     104             : size_t
     105        2526 : sts_estimate(int participants)
     106             : {
     107        2526 :     return offsetof(SharedTuplestore, participants) +
     108        2526 :         sizeof(SharedTuplestoreParticipant) * participants;
     109             : }
     110             : 
     111             : /*
     112             :  * Initialize a SharedTuplestore in existing shared memory.  There must be
     113             :  * space for sts_estimate(participants) bytes.  If flags includes the value
     114             :  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
     115             :  * eagerly (but this isn't yet implemented).
     116             :  *
     117             :  * Tuples that are stored may optionally carry a piece of fixed sized
     118             :  * meta-data which will be retrieved along with the tuple.  This is useful for
     119             :  * the hash values used in multi-batch hash joins, but could have other
     120             :  * applications.
     121             :  *
     122             :  * The caller must supply a SharedFileSet, which is essentially a directory
     123             :  * that will be cleaned up automatically, and a name which must be unique
     124             :  * across all SharedTuplestores created in the same SharedFileSet.
     125             :  */
     126             : SharedTuplestoreAccessor *
     127        1088 : sts_initialize(SharedTuplestore *sts, int participants,
     128             :                int my_participant_number,
     129             :                size_t meta_data_size,
     130             :                int flags,
     131             :                SharedFileSet *fileset,
     132             :                const char *name)
     133             : {
     134             :     SharedTuplestoreAccessor *accessor;
     135             :     int         i;
     136             : 
     137             :     Assert(my_participant_number < participants);
     138             : 
     139        1088 :     sts->nparticipants = participants;
     140        1088 :     sts->meta_data_size = meta_data_size;
     141        1088 :     sts->flags = flags;
     142             : 
     143        1088 :     if (strlen(name) > sizeof(sts->name) - 1)
     144           0 :         elog(ERROR, "SharedTuplestore name too long");
     145        1088 :     strcpy(sts->name, name);
     146             : 
     147             :     /*
     148             :      * Limit meta-data so it + tuple size always fits into a single chunk.
     149             :      * sts_puttuple() and sts_read_tuple() could be made to support scenarios
     150             :      * where that's not the case, but it's not currently required. If so,
     151             :      * meta-data size probably should be made variable, too.
     152             :      */
     153        1088 :     if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
     154           0 :         elog(ERROR, "meta-data too long");
     155             : 
     156        4072 :     for (i = 0; i < participants; ++i)
     157             :     {
     158        2984 :         LWLockInitialize(&sts->participants[i].lock,
     159             :                          LWTRANCHE_SHARED_TUPLESTORE);
     160        2984 :         sts->participants[i].read_page = 0;
     161        2984 :         sts->participants[i].writing = false;
     162             :     }
     163             : 
     164        1088 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     165        1088 :     accessor->participant = my_participant_number;
     166        1088 :     accessor->sts = sts;
     167        1088 :     accessor->fileset = fileset;
     168        1088 :     accessor->context = CurrentMemoryContext;
     169             : 
     170        1088 :     return accessor;
     171             : }
     172             : 
     173             : /*
     174             :  * Attach to a SharedTuplestore that has been initialized by another backend,
     175             :  * so that this backend can read and write tuples.
     176             :  */
     177             : SharedTuplestoreAccessor *
     178        1308 : sts_attach(SharedTuplestore *sts,
     179             :            int my_participant_number,
     180             :            SharedFileSet *fileset)
     181             : {
     182             :     SharedTuplestoreAccessor *accessor;
     183             : 
     184             :     Assert(my_participant_number < sts->nparticipants);
     185             : 
     186        1308 :     accessor = palloc0(sizeof(SharedTuplestoreAccessor));
     187        1308 :     accessor->participant = my_participant_number;
     188        1308 :     accessor->sts = sts;
     189        1308 :     accessor->fileset = fileset;
     190        1308 :     accessor->context = CurrentMemoryContext;
     191             : 
     192        1308 :     return accessor;
     193             : }
     194             : 
     195             : static void
     196        2204 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
     197             : {
     198             :     size_t      size;
     199             :     size_t      written;
     200             : 
     201        2204 :     size = STS_CHUNK_PAGES * BLCKSZ;
     202        2204 :     written = BufFileWrite(accessor->write_file, accessor->write_chunk, size);
     203        2204 :     if (written != size)
     204           0 :         ereport(ERROR,
     205             :                 (errcode_for_file_access(),
     206             :                  errmsg("could not write to temporary file: %m")));
     207        2204 :     memset(accessor->write_chunk, 0, size);
     208        2204 :     accessor->write_pointer = &accessor->write_chunk->data[0];
     209        2204 :     accessor->sts->participants[accessor->participant].npages +=
     210             :         STS_CHUNK_PAGES;
     211        2204 : }
     212             : 
     213             : /*
     214             :  * Finish writing tuples.  This must be called by all backends that have
     215             :  * written data before any backend begins reading it.
     216             :  */
     217             : void
     218        3804 : sts_end_write(SharedTuplestoreAccessor *accessor)
     219             : {
     220        3804 :     if (accessor->write_file != NULL)
     221             :     {
     222        1206 :         sts_flush_chunk(accessor);
     223        1206 :         BufFileClose(accessor->write_file);
     224        1206 :         pfree(accessor->write_chunk);
     225        1206 :         accessor->write_chunk = NULL;
     226        1206 :         accessor->write_file = NULL;
     227        1206 :         accessor->sts->participants[accessor->participant].writing = false;
     228             :     }
     229        3804 : }
     230             : 
     231             : /*
     232             :  * Prepare to rescan.  Only one participant must call this.  After it returns,
     233             :  * all participants may call sts_begin_parallel_scan() and then loop over
     234             :  * sts_parallel_scan_next().  This function must not be called concurrently
     235             :  * with a scan, and synchronization to avoid that is the caller's
     236             :  * responsibility.
     237             :  */
     238             : void
     239           0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
     240             : {
     241             :     int         i;
     242             : 
     243             :     /*
     244             :      * Reset the shared read head for all participants' files.  Also set the
     245             :      * initial chunk size to the minimum (any increases from that size will be
     246             :      * recorded in chunk_expansion_log).
     247             :      */
     248           0 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     249             :     {
     250           0 :         accessor->sts->participants[i].read_page = 0;
     251             :     }
     252           0 : }
     253             : 
     254             : /*
     255             :  * Begin scanning the contents in parallel.
     256             :  */
     257             : void
     258         984 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
     259             : {
     260             :     int         i PG_USED_FOR_ASSERTS_ONLY;
     261             : 
     262             :     /* End any existing scan that was in progress. */
     263         984 :     sts_end_parallel_scan(accessor);
     264             : 
     265             :     /*
     266             :      * Any backend that might have written into this shared tuplestore must
     267             :      * have called sts_end_write(), so that all buffers are flushed and the
     268             :      * files have stopped growing.
     269             :      */
     270         984 :     for (i = 0; i < accessor->sts->nparticipants; ++i)
     271             :         Assert(!accessor->sts->participants[i].writing);
     272             : 
     273             :     /*
     274             :      * We will start out reading the file that THIS backend wrote.  There may
     275             :      * be some caching locality advantage to that.
     276             :      */
     277         984 :     accessor->read_participant = accessor->participant;
     278         984 :     accessor->read_file = NULL;
     279         984 :     accessor->read_next_page = 0;
     280         984 : }
     281             : 
     282             : /*
     283             :  * Finish a parallel scan, freeing associated backend-local resources.
     284             :  */
     285             : void
     286        4876 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
     287             : {
     288             :     /*
     289             :      * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
     290             :      * we'd probably need a reference count of current parallel scanners so we
     291             :      * could safely do it only when the reference count reaches zero.
     292             :      */
     293        4876 :     if (accessor->read_file != NULL)
     294             :     {
     295           0 :         BufFileClose(accessor->read_file);
     296           0 :         accessor->read_file = NULL;
     297             :     }
     298        4876 : }
     299             : 
     300             : /*
     301             :  * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
     302             :  * pointer to meta data of that size must be provided.
     303             :  */
     304             : void
     305     1498266 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
     306             :              MinimalTuple tuple)
     307             : {
     308             :     size_t      size;
     309             : 
     310             :     /* Do we have our own file yet? */
     311     1498266 :     if (accessor->write_file == NULL)
     312             :     {
     313             :         SharedTuplestoreParticipant *participant;
     314             :         char        name[MAXPGPATH];
     315             : 
     316             :         /* Create one.  Only this backend will write into it. */
     317        1206 :         sts_filename(name, accessor, accessor->participant);
     318        1206 :         accessor->write_file = BufFileCreateShared(accessor->fileset, name);
     319             : 
     320             :         /* Set up the shared state for this backend's file. */
     321        1206 :         participant = &accessor->sts->participants[accessor->participant];
     322        1206 :         participant->writing = true; /* for assertions only */
     323             :     }
     324             : 
     325             :     /* Do we have space? */
     326     1498266 :     size = accessor->sts->meta_data_size + tuple->t_len;
     327     1498266 :     if (accessor->write_pointer + size >= accessor->write_end)
     328             :     {
     329        2060 :         if (accessor->write_chunk == NULL)
     330             :         {
     331             :             /* First time through.  Allocate chunk. */
     332        1206 :             accessor->write_chunk = (SharedTuplestoreChunk *)
     333        1206 :                 MemoryContextAllocZero(accessor->context,
     334             :                                        STS_CHUNK_PAGES * BLCKSZ);
     335        1206 :             accessor->write_chunk->ntuples = 0;
     336        1206 :             accessor->write_pointer = &accessor->write_chunk->data[0];
     337        1206 :             accessor->write_end = (char *)
     338        1206 :                 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
     339             :         }
     340             :         else
     341             :         {
     342             :             /* See if flushing helps. */
     343         854 :             sts_flush_chunk(accessor);
     344             :         }
     345             : 
     346             :         /* It may still not be enough in the case of a gigantic tuple. */
     347        2060 :         if (accessor->write_pointer + size >= accessor->write_end)
     348             :         {
     349             :             size_t      written;
     350             : 
     351             :             /*
     352             :              * We'll write the beginning of the oversized tuple, and then
     353             :              * write the rest in some number of 'overflow' chunks.
     354             :              *
     355             :              * sts_initialize() verifies that the size of the tuple +
     356             :              * meta-data always fits into a chunk. Because the chunk has been
     357             :              * flushed above, we can be sure to have all of a chunk's usable
     358             :              * space available.
     359             :              */
     360             :             Assert(accessor->write_pointer + accessor->sts->meta_data_size +
     361             :                    sizeof(uint32) < accessor->write_end);
     362             : 
     363             :             /* Write the meta-data as one chunk. */
     364          16 :             if (accessor->sts->meta_data_size > 0)
     365          16 :                 memcpy(accessor->write_pointer, meta_data,
     366          16 :                        accessor->sts->meta_data_size);
     367             : 
     368             :             /*
     369             :              * Write as much of the tuple as we can fit. This includes the
     370             :              * tuple's size at the start.
     371             :              */
     372          32 :             written = accessor->write_end - accessor->write_pointer -
     373          16 :                 accessor->sts->meta_data_size;
     374          16 :             memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
     375             :                    tuple, written);
     376          16 :             ++accessor->write_chunk->ntuples;
     377          16 :             size -= accessor->sts->meta_data_size;
     378          16 :             size -= written;
     379             :             /* Now write as many overflow chunks as we need for the rest. */
     380         176 :             while (size > 0)
     381             :             {
     382             :                 size_t      written_this_chunk;
     383             : 
     384         144 :                 sts_flush_chunk(accessor);
     385             : 
     386             :                 /*
     387             :                  * How many overflow chunks to go?  This will allow readers to
     388             :                  * skip all of them at once instead of reading each one.
     389             :                  */
     390         144 :                 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
     391             :                     STS_CHUNK_DATA_SIZE;
     392         144 :                 written_this_chunk =
     393         144 :                     Min(accessor->write_end - accessor->write_pointer, size);
     394         144 :                 memcpy(accessor->write_pointer, (char *) tuple + written,
     395             :                        written_this_chunk);
     396         144 :                 accessor->write_pointer += written_this_chunk;
     397         144 :                 size -= written_this_chunk;
     398         144 :                 written += written_this_chunk;
     399             :             }
     400          16 :             return;
     401             :         }
     402             :     }
     403             : 
     404             :     /* Copy meta-data and tuple into buffer. */
     405     1498250 :     if (accessor->sts->meta_data_size > 0)
     406     1498250 :         memcpy(accessor->write_pointer, meta_data,
     407     1498250 :                accessor->sts->meta_data_size);
     408     1498250 :     memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
     409     1498250 :            tuple->t_len);
     410     1498250 :     accessor->write_pointer += size;
     411     1498250 :     ++accessor->write_chunk->ntuples;
     412             : }
     413             : 
     414             : static MinimalTuple
     415     1498266 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
     416             : {
     417             :     MinimalTuple tuple;
     418             :     uint32      size;
     419             :     size_t      remaining_size;
     420             :     size_t      this_chunk_size;
     421             :     char       *destination;
     422             : 
     423             :     /*
     424             :      * We'll keep track of bytes read from this chunk so that we can detect an
     425             :      * overflowing tuple and switch to reading overflow pages.
     426             :      */
     427     1498266 :     if (accessor->sts->meta_data_size > 0)
     428             :     {
     429     2996532 :         if (BufFileRead(accessor->read_file,
     430             :                         meta_data,
     431     1498266 :                         accessor->sts->meta_data_size) !=
     432     1498266 :             accessor->sts->meta_data_size)
     433           0 :             ereport(ERROR,
     434             :                     (errcode_for_file_access(),
     435             :                      errmsg("could not read from shared tuplestore temporary file"),
     436             :                      errdetail_internal("Short read while reading meta-data.")));
     437     1498266 :         accessor->read_bytes += accessor->sts->meta_data_size;
     438             :     }
     439     1498266 :     if (BufFileRead(accessor->read_file,
     440             :                     &size,
     441             :                     sizeof(size)) != sizeof(size))
     442           0 :         ereport(ERROR,
     443             :                 (errcode_for_file_access(),
     444             :                  errmsg("could not read from shared tuplestore temporary file"),
     445             :                  errdetail_internal("Short read while reading size.")));
     446     1498266 :     accessor->read_bytes += sizeof(size);
     447     1498266 :     if (size > accessor->read_buffer_size)
     448             :     {
     449             :         size_t      new_read_buffer_size;
     450             : 
     451         780 :         if (accessor->read_buffer != NULL)
     452           0 :             pfree(accessor->read_buffer);
     453         780 :         new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
     454         780 :         accessor->read_buffer =
     455         780 :             MemoryContextAlloc(accessor->context, new_read_buffer_size);
     456         780 :         accessor->read_buffer_size = new_read_buffer_size;
     457             :     }
     458     1498266 :     remaining_size = size - sizeof(uint32);
     459     1498266 :     this_chunk_size = Min(remaining_size,
     460             :                           BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
     461     1498266 :     destination = accessor->read_buffer + sizeof(uint32);
     462     1498266 :     if (BufFileRead(accessor->read_file,
     463             :                     destination,
     464             :                     this_chunk_size) != this_chunk_size)
     465           0 :         ereport(ERROR,
     466             :                 (errcode_for_file_access(),
     467             :                  errmsg("could not read from shared tuplestore temporary file"),
     468             :                  errdetail_internal("Short read while reading tuple.")));
     469     1498266 :     accessor->read_bytes += this_chunk_size;
     470     1498266 :     remaining_size -= this_chunk_size;
     471     1498266 :     destination += this_chunk_size;
     472     1498266 :     ++accessor->read_ntuples;
     473             : 
     474             :     /* Check if we need to read any overflow chunks. */
     475     2996676 :     while (remaining_size > 0)
     476             :     {
     477             :         /* We are now positioned at the start of an overflow chunk. */
     478             :         SharedTuplestoreChunk chunk_header;
     479             : 
     480         144 :         if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
     481             :             STS_CHUNK_HEADER_SIZE)
     482           0 :             ereport(ERROR,
     483             :                     (errcode_for_file_access(),
     484             :                      errmsg("could not read from shared tuplestore temporary file"),
     485             :                      errdetail_internal("Short read while reading overflow chunk header.")));
     486         144 :         accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     487         144 :         if (chunk_header.overflow == 0)
     488           0 :             ereport(ERROR,
     489             :                     (errcode_for_file_access(),
     490             :                      errmsg("unexpected chunk in shared tuplestore temporary file"),
     491             :                      errdetail_internal("Expected overflow chunk.")));
     492         144 :         accessor->read_next_page += STS_CHUNK_PAGES;
     493         144 :         this_chunk_size = Min(remaining_size,
     494             :                               BLCKSZ * STS_CHUNK_PAGES -
     495             :                               STS_CHUNK_HEADER_SIZE);
     496         144 :         if (BufFileRead(accessor->read_file,
     497             :                         destination,
     498             :                         this_chunk_size) != this_chunk_size)
     499           0 :             ereport(ERROR,
     500             :                     (errcode_for_file_access(),
     501             :                      errmsg("could not read from shared tuplestore temporary file"),
     502             :                      errdetail_internal("Short read while reading tuple.")));
     503         144 :         accessor->read_bytes += this_chunk_size;
     504         144 :         remaining_size -= this_chunk_size;
     505         144 :         destination += this_chunk_size;
     506             : 
     507             :         /*
     508             :          * These will be used to count regular tuples following the oversized
     509             :          * tuple that spilled into this overflow chunk.
     510             :          */
     511         144 :         accessor->read_ntuples = 0;
     512         144 :         accessor->read_ntuples_available = chunk_header.ntuples;
     513             :     }
     514             : 
     515     1498266 :     tuple = (MinimalTuple) accessor->read_buffer;
     516     1498266 :     tuple->t_len = size;
     517             : 
     518     1498266 :     return tuple;
     519             : }
     520             : 
     521             : /*
     522             :  * Get the next tuple in the current parallel scan.
     523             :  */
     524             : MinimalTuple
     525     1502834 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
     526             : {
     527             :     SharedTuplestoreParticipant *p;
     528             :     BlockNumber read_page;
     529             :     bool        eof;
     530             : 
     531             :     for (;;)
     532             :     {
     533             :         /* Can we read more tuples from the current chunk? */
     534     1506492 :         if (accessor->read_ntuples < accessor->read_ntuples_available)
     535     1498266 :             return sts_read_tuple(accessor, meta_data);
     536             : 
     537             :         /* Find the location of a new chunk to read. */
     538        4568 :         p = &accessor->sts->participants[accessor->read_participant];
     539             : 
     540        4568 :         LWLockAcquire(&p->lock, LW_EXCLUSIVE);
     541             :         /* We can skip directly past overflow pages we know about. */
     542        4568 :         if (p->read_page < accessor->read_next_page)
     543          16 :             p->read_page = accessor->read_next_page;
     544        4568 :         eof = p->read_page >= p->npages;
     545        4568 :         if (!eof)
     546             :         {
     547             :             /* Claim the next chunk. */
     548        2060 :             read_page = p->read_page;
     549             :             /* Advance the read head for the next reader. */
     550        2060 :             p->read_page += STS_CHUNK_PAGES;
     551        2060 :             accessor->read_next_page = p->read_page;
     552             :         }
     553        4568 :         LWLockRelease(&p->lock);
     554             : 
     555        4568 :         if (!eof)
     556             :         {
     557             :             SharedTuplestoreChunk chunk_header;
     558             : 
     559             :             /* Make sure we have the file open. */
     560        2060 :             if (accessor->read_file == NULL)
     561             :             {
     562             :                 char        name[MAXPGPATH];
     563             : 
     564        1228 :                 sts_filename(name, accessor, accessor->read_participant);
     565        1228 :                 accessor->read_file =
     566        1228 :                     BufFileOpenShared(accessor->fileset, name);
     567             :             }
     568             : 
     569             :             /* Seek and load the chunk header. */
     570        2060 :             if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
     571           0 :                 ereport(ERROR,
     572             :                         (errcode_for_file_access(),
     573             :                          errmsg("could not read from shared tuplestore temporary file"),
     574             :                          errdetail_internal("Could not seek to next block.")));
     575        2060 :             if (BufFileRead(accessor->read_file, &chunk_header,
     576             :                             STS_CHUNK_HEADER_SIZE) != STS_CHUNK_HEADER_SIZE)
     577           0 :                 ereport(ERROR,
     578             :                         (errcode_for_file_access(),
     579             :                          errmsg("could not read from shared tuplestore temporary file"),
     580             :                          errdetail_internal("Short read while reading chunk header.")));
     581             : 
     582             :             /*
     583             :              * If this is an overflow chunk, we skip it and any following
     584             :              * overflow chunks all at once.
     585             :              */
     586        2060 :             if (chunk_header.overflow > 0)
     587             :             {
     588           0 :                 accessor->read_next_page = read_page +
     589           0 :                     chunk_header.overflow * STS_CHUNK_PAGES;
     590           0 :                 continue;
     591             :             }
     592             : 
     593        2060 :             accessor->read_ntuples = 0;
     594        2060 :             accessor->read_ntuples_available = chunk_header.ntuples;
     595        2060 :             accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
     596             : 
     597             :             /* Go around again, so we can get a tuple from this chunk. */
     598             :         }
     599             :         else
     600             :         {
     601        2508 :             if (accessor->read_file != NULL)
     602             :             {
     603        1228 :                 BufFileClose(accessor->read_file);
     604        1228 :                 accessor->read_file = NULL;
     605             :             }
     606             : 
     607             :             /*
     608             :              * Try the next participant's file.  If we've gone full circle,
     609             :              * we're done.
     610             :              */
     611        5016 :             accessor->read_participant = (accessor->read_participant + 1) %
     612        2508 :                 accessor->sts->nparticipants;
     613        2508 :             if (accessor->read_participant == accessor->participant)
     614         910 :                 break;
     615        1598 :             accessor->read_next_page = 0;
     616             : 
     617             :             /* Go around again, so we can get a chunk from this file. */
     618             :         }
     619             :     }
     620             : 
     621         910 :     return NULL;
     622             : }
     623             : 
     624             : /*
     625             :  * Create the name used for the BufFile that a given participant will write.
     626             :  */
     627             : static void
     628        2434 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
     629             : {
     630        2434 :     snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
     631        2434 : }

Generated by: LCOV version 1.13