LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 96.3 % 267 257
Test Date: 2026-02-17 17:20:33 Functions: 92.3 % 13 12
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * read_stream.c
       4              :  *    Mechanism for accessing buffered relation data with look-ahead
       5              :  *
       6              :  * Code that needs to access relation data typically pins blocks one at a
       7              :  * time, often in a predictable order that might be sequential or data-driven.
       8              :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9              :  * because blocks that are not yet in the buffer pool require I/O operations
      10              :  * that are small and might stall waiting for storage.  This mechanism looks
      11              :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12              :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13              :  * distance.
      14              :  *
      15              :  * A user-provided callback generates a stream of block numbers that is used
      16              :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17              :  * pending read.  When that isn't possible, the existing pending read is sent
      18              :  * to StartReadBuffers() so that a new one can begin to form.
      19              :  *
      20              :  * The algorithm for controlling the look-ahead distance is based on recent
      21              :  * cache hit and miss history.  When no I/O is necessary, there is no benefit
      22              :  * in looking ahead more than one block.  This is the default initial
      23              :  * assumption, but when blocks needing I/O are streamed, the distance is
      24              :  * increased rapidly to try to benefit from I/O combining and concurrency.  It
      25              :  * is reduced gradually when cached blocks are streamed.
      26              :  *
      27              :  * The main data structure is a circular queue of buffers of size
      28              :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      29              :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      30              :  * variable sized object that is passed from the callback to the consumer of
      31              :  * buffers.
      32              :  *
      33              :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      34              :  * I/Os that have been started with StartReadBuffers(), and for which
      35              :  * WaitReadBuffers() must be called before returning the buffer.
      36              :  *
      37              :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      38              :  * successive calls, then these data structures might appear as follows:
      39              :  *
      40              :  *                          buffers buf/data       ios
      41              :  *
      42              :  *                          +----+  +-----+       +--------+
      43              :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      44              :  *                          +----+  +-----+  |    +--------+
      45              :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      46              :  *                          +----+  +-----+  | |  +--------+
      47              :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      48              :  *                          +----+  +-----+    |  +--------+
      49              :  *                          | 43 |  |  ?  |    |  |        |
      50              :  *                          +----+  +-----+    |  +--------+
      51              :  *                          | 44 |  |  ?  |    |  |        |
      52              :  *                          +----+  +-----+    |  +--------+
      53              :  *                          | 60 |  |  ?  |<---+
      54              :  *                          +----+  +-----+
      55              :  *     next_buffer_index -> |    |  |     |
      56              :  *                          +----+  +-----+
      57              :  *
      58              :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      59              :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      60              :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      61              :  * does block 60.
      62              :  *
      63              :  *
      64              :  * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
      65              :  * Portions Copyright (c) 1994, Regents of the University of California
      66              :  *
      67              :  * IDENTIFICATION
      68              :  *    src/backend/storage/aio/read_stream.c
      69              :  *
      70              :  *-------------------------------------------------------------------------
      71              :  */
      72              : #include "postgres.h"
      73              : 
      74              : #include "miscadmin.h"
      75              : #include "storage/aio.h"
      76              : #include "storage/fd.h"
      77              : #include "storage/smgr.h"
      78              : #include "storage/read_stream.h"
      79              : #include "utils/memdebug.h"
      80              : #include "utils/rel.h"
      81              : #include "utils/spccache.h"
      82              : 
      83              : typedef struct InProgressIO
      84              : {
      85              :     int16       buffer_index;
      86              :     ReadBuffersOperation op;
      87              : } InProgressIO;
      88              : 
      89              : /*
      90              :  * State for managing a stream of reads.
      91              :  */
      92              : struct ReadStream
      93              : {
      94              :     int16       max_ios;
      95              :     int16       io_combine_limit;
      96              :     int16       ios_in_progress;
      97              :     int16       queue_size;
      98              :     int16       max_pinned_buffers;
      99              :     int16       forwarded_buffers;
     100              :     int16       pinned_buffers;
     101              :     int16       distance;
     102              :     int16       initialized_buffers;
     103              :     int         read_buffers_flags;
     104              :     bool        sync_mode;      /* using io_method=sync */
     105              :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
     106              :     bool        advice_enabled;
     107              :     bool        temporary;
     108              : 
     109              :     /*
     110              :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     111              :      * control problems when I/Os are split.
     112              :      */
     113              :     BlockNumber buffered_blocknum;
     114              : 
     115              :     /*
     116              :      * The callback that will tell us which block numbers to read, and an
     117              :      * opaque pointer that will be pass to it for its own purposes.
     118              :      */
     119              :     ReadStreamBlockNumberCB callback;
     120              :     void       *callback_private_data;
     121              : 
     122              :     /* Next expected block, for detecting sequential access. */
     123              :     BlockNumber seq_blocknum;
     124              :     BlockNumber seq_until_processed;
     125              : 
     126              :     /* The read operation we are currently preparing. */
     127              :     BlockNumber pending_read_blocknum;
     128              :     int16       pending_read_nblocks;
     129              : 
     130              :     /* Space for buffers and optional per-buffer private data. */
     131              :     size_t      per_buffer_data_size;
     132              :     void       *per_buffer_data;
     133              : 
     134              :     /* Read operations that have been started but not waited for yet. */
     135              :     InProgressIO *ios;
     136              :     int16       oldest_io_index;
     137              :     int16       next_io_index;
     138              : 
     139              :     bool        fast_path;
     140              : 
     141              :     /* Circular queue of buffers. */
     142              :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     143              :     int16       next_buffer_index;  /* Index of next buffer to pin */
     144              :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     145              : };
     146              : 
     147              : /*
     148              :  * Return a pointer to the per-buffer data by index.
     149              :  */
     150              : static inline void *
     151      3792922 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     152              : {
     153      7585844 :     return (char *) stream->per_buffer_data +
     154      3792922 :         stream->per_buffer_data_size * buffer_index;
     155              : }
     156              : 
     157              : /*
     158              :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     159              :  * blocks [current_blocknum, last_exclusive).
     160              :  */
     161              : BlockNumber
     162       343746 : block_range_read_stream_cb(ReadStream *stream,
     163              :                            void *callback_private_data,
     164              :                            void *per_buffer_data)
     165              : {
     166       343746 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     167              : 
     168       343746 :     if (p->current_blocknum < p->last_exclusive)
     169       276818 :         return p->current_blocknum++;
     170              : 
     171        66928 :     return InvalidBlockNumber;
     172              : }
     173              : 
     174              : /*
     175              :  * Ask the callback which block it would like us to read next, with a one block
     176              :  * buffer in front to allow read_stream_unget_block() to work.
     177              :  */
     178              : static inline BlockNumber
     179      5255040 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     180              : {
     181              :     BlockNumber blocknum;
     182              : 
     183      5255040 :     blocknum = stream->buffered_blocknum;
     184      5255040 :     if (blocknum != InvalidBlockNumber)
     185            6 :         stream->buffered_blocknum = InvalidBlockNumber;
     186              :     else
     187              :     {
     188              :         /*
     189              :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
     190              :          * the "noaccess" state that was set when the consumer moved past this
     191              :          * entry last time around the queue, and should also catch callbacks
     192              :          * that fail to initialize data that the buffer consumer later
     193              :          * accesses.  On the first go around, it is undefined already.
     194              :          */
     195              :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     196              :                                     stream->per_buffer_data_size);
     197      5255034 :         blocknum = stream->callback(stream,
     198              :                                     stream->callback_private_data,
     199              :                                     per_buffer_data);
     200              :     }
     201              : 
     202      5255040 :     return blocknum;
     203              : }
     204              : 
     205              : /*
     206              :  * In order to deal with buffer shortages and I/O limits after short reads, we
     207              :  * sometimes need to defer handling of a block we've already consumed from the
     208              :  * registered callback until later.
     209              :  */
     210              : static inline void
     211            6 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     212              : {
     213              :     /* We shouldn't ever unget more than one block. */
     214              :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     215              :     Assert(blocknum != InvalidBlockNumber);
     216            6 :     stream->buffered_blocknum = blocknum;
     217            6 : }
     218              : 
     219              : /*
     220              :  * Start as much of the current pending read as we can.  If we have to split it
     221              :  * because of the per-backend buffer limit, or the buffer manager decides to
     222              :  * split it, then the pending read is adjusted to hold the remaining portion.
     223              :  *
     224              :  * We can always start a read of at least size one if we have no progress yet.
     225              :  * Otherwise it's possible that we can't start a read at all because of a lack
     226              :  * of buffers, and then false is returned.  Buffer shortages also reduce the
     227              :  * distance to a level that prevents look-ahead until buffers are released.
     228              :  */
     229              : static bool
     230      1907541 : read_stream_start_pending_read(ReadStream *stream)
     231              : {
     232              :     bool        need_wait;
     233              :     int         requested_nblocks;
     234              :     int         nblocks;
     235              :     int         flags;
     236              :     int         forwarded;
     237              :     int16       io_index;
     238              :     int16       overflow;
     239              :     int16       buffer_index;
     240              :     int         buffer_limit;
     241              : 
     242              :     /* This should only be called with a pending read. */
     243              :     Assert(stream->pending_read_nblocks > 0);
     244              :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
     245              : 
     246              :     /* We had better not exceed the per-stream buffer limit with this read. */
     247              :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     248              :            stream->max_pinned_buffers);
     249              : 
     250              : #ifdef USE_ASSERT_CHECKING
     251              :     /* We had better not be overwriting an existing pinned buffer. */
     252              :     if (stream->pinned_buffers > 0)
     253              :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     254              :     else
     255              :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     256              : 
     257              :     /*
     258              :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
     259              :      * had to split the operation should match the leading blocks of this
     260              :      * following StartReadBuffers() call.
     261              :      */
     262              :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     263              :     for (int i = 0; i < stream->forwarded_buffers; ++i)
     264              :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
     265              :                stream->pending_read_blocknum + i);
     266              : 
     267              :     /*
     268              :      * Check that we've cleared the queue/overflow entries corresponding to
     269              :      * the rest of the blocks covered by this read, unless it's the first go
     270              :      * around and we haven't even initialized them yet.
     271              :      */
     272              :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
     273              :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
     274              :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
     275              : #endif
     276              : 
     277              :     /* Do we need to issue read-ahead advice? */
     278      1907541 :     flags = stream->read_buffers_flags;
     279      1907541 :     if (stream->advice_enabled)
     280              :     {
     281         1652 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
     282              :         {
     283              :             /*
     284              :              * Sequential:  Issue advice until the preadv() calls have caught
     285              :              * up with the first advice issued for this sequential region, and
     286              :              * then stay out of the way of the kernel's own read-ahead.
     287              :              */
     288           22 :             if (stream->seq_until_processed != InvalidBlockNumber)
     289            1 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     290              :         }
     291              :         else
     292              :         {
     293              :             /*
     294              :              * Random jump:  Note the starting location of a new potential
     295              :              * sequential region and start issuing advice.  Skip it this time
     296              :              * if the preadv() follows immediately, eg first block in stream.
     297              :              */
     298         1630 :             stream->seq_until_processed = stream->pending_read_blocknum;
     299         1630 :             if (stream->pinned_buffers > 0)
     300           25 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     301              :         }
     302              :     }
     303              : 
     304              :     /*
     305              :      * How many more buffers is this backend allowed?
     306              :      *
     307              :      * Forwarded buffers are already pinned and map to the leading blocks of
     308              :      * the pending read (the remaining portion of an earlier short read that
     309              :      * we're about to continue).  They are not counted in pinned_buffers, but
     310              :      * they are counted as pins already held by this backend according to the
     311              :      * buffer manager, so they must be added to the limit it grants us.
     312              :      */
     313      1907541 :     if (stream->temporary)
     314        11993 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
     315              :     else
     316      1895548 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
     317              :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     318              : 
     319      1907541 :     buffer_limit += stream->forwarded_buffers;
     320      1907541 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
     321              : 
     322      1907541 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
     323       631431 :         buffer_limit = 1;       /* guarantee progress */
     324              : 
     325              :     /* Does the per-backend limit affect this read? */
     326      1907541 :     nblocks = stream->pending_read_nblocks;
     327      1907541 :     if (buffer_limit < nblocks)
     328              :     {
     329              :         int16       new_distance;
     330              : 
     331              :         /* Shrink distance: no more look-ahead until buffers are released. */
     332         2721 :         new_distance = stream->pinned_buffers + buffer_limit;
     333         2721 :         if (stream->distance > new_distance)
     334         1797 :             stream->distance = new_distance;
     335              : 
     336              :         /* Unless we have nothing to give the consumer, stop here. */
     337         2721 :         if (stream->pinned_buffers > 0)
     338         1462 :             return false;
     339              : 
     340              :         /* A short read is required to make progress. */
     341         1259 :         nblocks = buffer_limit;
     342              :     }
     343              : 
     344              :     /*
     345              :      * We say how many blocks we want to read, but it may be smaller on return
     346              :      * if the buffer manager decides to shorten the read.  Initialize buffers
     347              :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
     348              :      * and keep the original nblocks number so we can check for forwarded
     349              :      * buffers as output, below.
     350              :      */
     351      1906079 :     buffer_index = stream->next_buffer_index;
     352      1906079 :     io_index = stream->next_io_index;
     353      3079777 :     while (stream->initialized_buffers < buffer_index + nblocks)
     354      1173698 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
     355      1906079 :     requested_nblocks = nblocks;
     356      1906079 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     357      1906079 :                                  &stream->buffers[buffer_index],
     358              :                                  stream->pending_read_blocknum,
     359              :                                  &nblocks,
     360              :                                  flags);
     361      1906073 :     stream->pinned_buffers += nblocks;
     362              : 
     363              :     /* Remember whether we need to wait before returning this buffer. */
     364      1906073 :     if (!need_wait)
     365              :     {
     366              :         /* Look-ahead distance decays, no I/O necessary. */
     367      1332249 :         if (stream->distance > 1)
     368        17081 :             stream->distance--;
     369              :     }
     370              :     else
     371              :     {
     372              :         /*
     373              :          * Remember to call WaitReadBuffers() before returning head buffer.
     374              :          * Look-ahead distance will be adjusted after waiting.
     375              :          */
     376       573824 :         stream->ios[io_index].buffer_index = buffer_index;
     377       573824 :         if (++stream->next_io_index == stream->max_ios)
     378        23363 :             stream->next_io_index = 0;
     379              :         Assert(stream->ios_in_progress < stream->max_ios);
     380       573824 :         stream->ios_in_progress++;
     381       573824 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     382              :     }
     383              : 
     384              :     /*
     385              :      * How many pins were acquired but forwarded to the next call?  These need
     386              :      * to be passed to the next StartReadBuffers() call by leaving them
     387              :      * exactly where they are in the queue, or released if the stream ends
     388              :      * early.  We need the number for accounting purposes, since they are not
     389              :      * counted in stream->pinned_buffers but we already hold them.
     390              :      */
     391      1906073 :     forwarded = 0;
     392      1907714 :     while (nblocks + forwarded < requested_nblocks &&
     393        52313 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
     394         1641 :         forwarded++;
     395      1906073 :     stream->forwarded_buffers = forwarded;
     396              : 
     397              :     /*
     398              :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     399              :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
     400              :      * the front of the array where they'll be consumed, but also leave a copy
     401              :      * in the overflow zone which the I/O operation has a pointer to (it needs
     402              :      * a contiguous array).  Both copies will be cleared when the buffers are
     403              :      * handed to the consumer.
     404              :      */
     405      1906073 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
     406      1906073 :     if (overflow > 0)
     407              :     {
     408              :         Assert(overflow < stream->queue_size);    /* can't overlap */
     409          344 :         memcpy(&stream->buffers[0],
     410          344 :                &stream->buffers[stream->queue_size],
     411              :                sizeof(stream->buffers[0]) * overflow);
     412              :     }
     413              : 
     414              :     /* Compute location of start of next read, without using % operator. */
     415      1906073 :     buffer_index += nblocks;
     416      1906073 :     if (buffer_index >= stream->queue_size)
     417       295031 :         buffer_index -= stream->queue_size;
     418              :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     419      1906073 :     stream->next_buffer_index = buffer_index;
     420              : 
     421              :     /* Adjust the pending read to cover the remaining portion, if any. */
     422      1906073 :     stream->pending_read_blocknum += nblocks;
     423      1906073 :     stream->pending_read_nblocks -= nblocks;
     424              : 
     425      1906073 :     return true;
     426              : }
     427              : 
     428              : static void
     429      3199954 : read_stream_look_ahead(ReadStream *stream)
     430              : {
     431              :     /*
     432              :      * Allow amortizing the cost of submitting IO over multiple IOs. This
     433              :      * requires that we don't do any operations that could lead to a deadlock
     434              :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
     435              :      * careful.
     436              :      */
     437      3199954 :     if (stream->batch_mode)
     438      2605754 :         pgaio_enter_batchmode();
     439              : 
     440      5283614 :     while (stream->ios_in_progress < stream->max_ios &&
     441      5283611 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     442              :     {
     443              :         BlockNumber blocknum;
     444              :         int16       buffer_index;
     445              :         void       *per_buffer_data;
     446              : 
     447      3101690 :         if (stream->pending_read_nblocks == stream->io_combine_limit)
     448              :         {
     449         4342 :             read_stream_start_pending_read(stream);
     450         4342 :             continue;
     451              :         }
     452              : 
     453              :         /*
     454              :          * See which block the callback wants next in the stream.  We need to
     455              :          * compute the index of the Nth block of the pending read including
     456              :          * wrap-around, but we don't want to use the expensive % operator.
     457              :          */
     458      3097348 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     459      3097348 :         if (buffer_index >= stream->queue_size)
     460         1907 :             buffer_index -= stream->queue_size;
     461              :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     462      3097348 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     463      3097348 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     464      3097348 :         if (blocknum == InvalidBlockNumber)
     465              :         {
     466              :             /* End of stream. */
     467      1018024 :             stream->distance = 0;
     468      1018024 :             break;
     469              :         }
     470              : 
     471              :         /* Can we merge it with the pending read? */
     472      2079324 :         if (stream->pending_read_nblocks > 0 &&
     473       225702 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     474              :         {
     475       225660 :             stream->pending_read_nblocks++;
     476       225660 :             continue;
     477              :         }
     478              : 
     479              :         /* We have to start the pending read before we can build another. */
     480      1853714 :         while (stream->pending_read_nblocks > 0)
     481              :         {
     482           56 :             if (!read_stream_start_pending_read(stream) ||
     483           50 :                 stream->ios_in_progress == stream->max_ios)
     484              :             {
     485              :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
     486            6 :                 read_stream_unget_block(stream, blocknum);
     487            6 :                 if (stream->batch_mode)
     488            6 :                     pgaio_exit_batchmode();
     489            6 :                 return;
     490              :             }
     491              :         }
     492              : 
     493              :         /* This is the start of a new pending read. */
     494      1853658 :         stream->pending_read_blocknum = blocknum;
     495      1853658 :         stream->pending_read_nblocks = 1;
     496              :     }
     497              : 
     498              :     /*
     499              :      * We don't start the pending read just because we've hit the distance
     500              :      * limit, preferring to give it another chance to grow to full
     501              :      * io_combine_limit size once more buffers have been consumed.  However,
     502              :      * if we've already reached io_combine_limit, or we've reached the
     503              :      * distance limit and there isn't anything pinned yet, or the callback has
     504              :      * signaled end-of-stream, we start the read immediately.  Note that the
     505              :      * pending read can exceed the distance goal, if the latter was reduced
     506              :      * after hitting the per-backend buffer limit.
     507              :      */
     508      3199948 :     if (stream->pending_read_nblocks > 0 &&
     509      1944959 :         (stream->pending_read_nblocks == stream->io_combine_limit ||
     510      1936600 :          (stream->pending_read_nblocks >= stream->distance &&
     511      1894784 :           stream->pinned_buffers == 0) ||
     512        47910 :          stream->distance == 0) &&
     513      1903143 :         stream->ios_in_progress < stream->max_ios)
     514      1903143 :         read_stream_start_pending_read(stream);
     515              : 
     516              :     /*
     517              :      * There should always be something pinned when we leave this function,
     518              :      * whether started by this call or not, unless we've hit the end of the
     519              :      * stream.  In the worst case we can always make progress one buffer at a
     520              :      * time.
     521              :      */
     522              :     Assert(stream->pinned_buffers > 0 || stream->distance == 0);
     523              : 
     524      3199942 :     if (stream->batch_mode)
     525      2605742 :         pgaio_exit_batchmode();
     526              : }
     527              : 
     528              : /*
     529              :  * Create a new read stream object that can be used to perform the equivalent
     530              :  * of a series of ReadBuffer() calls for one fork of one relation.
     531              :  * Internally, it generates larger vectored reads where possible by looking
     532              :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     533              :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     534              :  * write extra data for each block into the space provided to it.  It will
     535              :  * also receive callback_private_data for its own purposes.
     536              :  */
     537              : static ReadStream *
     538       584218 : read_stream_begin_impl(int flags,
     539              :                        BufferAccessStrategy strategy,
     540              :                        Relation rel,
     541              :                        SMgrRelation smgr,
     542              :                        char persistence,
     543              :                        ForkNumber forknum,
     544              :                        ReadStreamBlockNumberCB callback,
     545              :                        void *callback_private_data,
     546              :                        size_t per_buffer_data_size)
     547              : {
     548              :     ReadStream *stream;
     549              :     size_t      size;
     550              :     int16       queue_size;
     551              :     int16       queue_overflow;
     552              :     int         max_ios;
     553              :     int         strategy_pin_limit;
     554              :     uint32      max_pinned_buffers;
     555              :     uint32      max_possible_buffer_limit;
     556              :     Oid         tablespace_id;
     557              : 
     558              :     /*
     559              :      * Decide how many I/Os we will allow to run at the same time.  That
     560              :      * currently means advice to the kernel to tell it that we will soon read.
     561              :      * This number also affects how far we look ahead for opportunities to
     562              :      * start more I/Os.
     563              :      */
     564       584218 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     565       584218 :     if (!OidIsValid(MyDatabaseId) ||
     566       671626 :         (rel && IsCatalogRelation(rel)) ||
     567       159805 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     568              :     {
     569              :         /*
     570              :          * Avoid circularity while trying to look up tablespace settings or
     571              :          * before spccache.c is ready.
     572              :          */
     573       482250 :         max_ios = effective_io_concurrency;
     574              :     }
     575       101968 :     else if (flags & READ_STREAM_MAINTENANCE)
     576        16282 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     577              :     else
     578        85686 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     579              : 
     580              :     /* Cap to INT16_MAX to avoid overflowing below */
     581       584218 :     max_ios = Min(max_ios, PG_INT16_MAX);
     582              : 
     583              :     /*
     584              :      * If starting a multi-block I/O near the end of the queue, we might
     585              :      * temporarily need extra space for overflowing buffers before they are
     586              :      * moved to regular circular position.  This is the maximum extra space we
     587              :      * could need.
     588              :      */
     589       584218 :     queue_overflow = io_combine_limit - 1;
     590              : 
     591              :     /*
     592              :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     593              :      * pin fewer if we can, though.  We add one so that we can make progress
     594              :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
     595              :      * this also allows an extra full I/O's worth of buffers: after an I/O
     596              :      * finishes we don't want to have to wait for its buffers to be consumed
     597              :      * before starting a new one.
     598              :      *
     599              :      * Be careful not to allow int16 to overflow.  That is possible with the
     600              :      * current GUC range limits, so this is an artificial limit of ~32k
     601              :      * buffers and we'd need to adjust the types to exceed that.  We also have
     602              :      * to allow for the spare entry and the overflow space.
     603              :      */
     604       584218 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
     605       584218 :     max_pinned_buffers = Min(max_pinned_buffers,
     606              :                              PG_INT16_MAX - queue_overflow - 1);
     607              : 
     608              :     /* Give the strategy a chance to limit the number of buffers we pin. */
     609       584218 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     610       584218 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     611              : 
     612              :     /*
     613              :      * Also limit our queue to the maximum number of pins we could ever be
     614              :      * allowed to acquire according to the buffer manager.  We may not really
     615              :      * be able to use them all due to other pins held by this backend, but
     616              :      * we'll check that later in read_stream_start_pending_read().
     617              :      */
     618       584218 :     if (SmgrIsTemp(smgr))
     619         6927 :         max_possible_buffer_limit = GetLocalPinLimit();
     620              :     else
     621       577291 :         max_possible_buffer_limit = GetPinLimit();
     622       584218 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
     623              : 
     624              :     /*
     625              :      * The limit might be zero on a system configured with too few buffers for
     626              :      * the number of connections.  We need at least one to make progress.
     627              :      */
     628       584218 :     max_pinned_buffers = Max(1, max_pinned_buffers);
     629              : 
     630              :     /*
     631              :      * We need one extra entry for buffers and per-buffer data, because users
     632              :      * of per-buffer data have access to the object until the next call to
     633              :      * read_stream_next_buffer(), so we need a gap between the head and tail
     634              :      * of the queue so that we don't clobber it.
     635              :      */
     636       584218 :     queue_size = max_pinned_buffers + 1;
     637              : 
     638              :     /*
     639              :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     640              :      * one big chunk.  Though we have queue_size buffers, we want to be able
     641              :      * to assume that all the buffers for a single read are contiguous (i.e.
     642              :      * don't wrap around halfway through), so we allow temporary overflows of
     643              :      * up to the maximum possible overflow size.
     644              :      */
     645       584218 :     size = offsetof(ReadStream, buffers);
     646       584218 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
     647       584218 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     648       584218 :     size += per_buffer_data_size * queue_size;
     649       584218 :     size += MAXIMUM_ALIGNOF * 2;
     650       584218 :     stream = (ReadStream *) palloc(size);
     651       584218 :     memset(stream, 0, offsetof(ReadStream, buffers));
     652       584218 :     stream->ios = (InProgressIO *)
     653       584218 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
     654       584218 :     if (per_buffer_data_size > 0)
     655       128052 :         stream->per_buffer_data = (void *)
     656       128052 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     657              : 
     658       584218 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
     659       584218 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
     660              : 
     661              : #ifdef USE_PREFETCH
     662              : 
     663              :     /*
     664              :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
     665              :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
     666              :      * caller hasn't promised sequential access (overriding our detection
     667              :      * heuristics), and max_ios hasn't been set to zero.
     668              :      */
     669       584218 :     if (stream->sync_mode &&
     670         2886 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
     671         2886 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     672              :         max_ios > 0)
     673          692 :         stream->advice_enabled = true;
     674              : #endif
     675              : 
     676              :     /*
     677              :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
     678              :      * we still need to allocate space to combine and run one I/O.  Bump it up
     679              :      * to one, and remember to ask for synchronous I/O only.
     680              :      */
     681       584218 :     if (max_ios == 0)
     682              :     {
     683            7 :         max_ios = 1;
     684            7 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
     685              :     }
     686              : 
     687              :     /*
     688              :      * Capture stable values for these two GUC-derived numbers for the
     689              :      * lifetime of this stream, so we don't have to worry about the GUCs
     690              :      * changing underneath us beyond this point.
     691              :      */
     692       584218 :     stream->max_ios = max_ios;
     693       584218 :     stream->io_combine_limit = io_combine_limit;
     694              : 
     695       584218 :     stream->per_buffer_data_size = per_buffer_data_size;
     696       584218 :     stream->max_pinned_buffers = max_pinned_buffers;
     697       584218 :     stream->queue_size = queue_size;
     698       584218 :     stream->callback = callback;
     699       584218 :     stream->callback_private_data = callback_private_data;
     700       584218 :     stream->buffered_blocknum = InvalidBlockNumber;
     701       584218 :     stream->seq_blocknum = InvalidBlockNumber;
     702       584218 :     stream->seq_until_processed = InvalidBlockNumber;
     703       584218 :     stream->temporary = SmgrIsTemp(smgr);
     704              : 
     705              :     /*
     706              :      * Skip the initial ramp-up phase if the caller says we're going to be
     707              :      * reading the whole relation.  This way we start out assuming we'll be
     708              :      * doing full io_combine_limit sized reads.
     709              :      */
     710       584218 :     if (flags & READ_STREAM_FULL)
     711        67743 :         stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
     712              :     else
     713       516475 :         stream->distance = 1;
     714              : 
     715              :     /*
     716              :      * Since we always access the same relation, we can initialize parts of
     717              :      * the ReadBuffersOperation objects and leave them that way, to avoid
     718              :      * wasting CPU cycles writing to them for each read.
     719              :      */
     720      9953123 :     for (int i = 0; i < max_ios; ++i)
     721              :     {
     722      9368905 :         stream->ios[i].op.rel = rel;
     723      9368905 :         stream->ios[i].op.smgr = smgr;
     724      9368905 :         stream->ios[i].op.persistence = persistence;
     725      9368905 :         stream->ios[i].op.forknum = forknum;
     726      9368905 :         stream->ios[i].op.strategy = strategy;
     727              :     }
     728              : 
     729       584218 :     return stream;
     730              : }
     731              : 
     732              : /*
     733              :  * Create a new read stream for reading a relation.
     734              :  * See read_stream_begin_impl() for the detailed explanation.
     735              :  */
     736              : ReadStream *
     737       522197 : read_stream_begin_relation(int flags,
     738              :                            BufferAccessStrategy strategy,
     739              :                            Relation rel,
     740              :                            ForkNumber forknum,
     741              :                            ReadStreamBlockNumberCB callback,
     742              :                            void *callback_private_data,
     743              :                            size_t per_buffer_data_size)
     744              : {
     745       522197 :     return read_stream_begin_impl(flags,
     746              :                                   strategy,
     747              :                                   rel,
     748              :                                   RelationGetSmgr(rel),
     749       522197 :                                   rel->rd_rel->relpersistence,
     750              :                                   forknum,
     751              :                                   callback,
     752              :                                   callback_private_data,
     753              :                                   per_buffer_data_size);
     754              : }
     755              : 
     756              : /*
     757              :  * Create a new read stream for reading a SMgr relation.
     758              :  * See read_stream_begin_impl() for the detailed explanation.
     759              :  */
     760              : ReadStream *
     761        62021 : read_stream_begin_smgr_relation(int flags,
     762              :                                 BufferAccessStrategy strategy,
     763              :                                 SMgrRelation smgr,
     764              :                                 char smgr_persistence,
     765              :                                 ForkNumber forknum,
     766              :                                 ReadStreamBlockNumberCB callback,
     767              :                                 void *callback_private_data,
     768              :                                 size_t per_buffer_data_size)
     769              : {
     770        62021 :     return read_stream_begin_impl(flags,
     771              :                                   strategy,
     772              :                                   NULL,
     773              :                                   smgr,
     774              :                                   smgr_persistence,
     775              :                                   forknum,
     776              :                                   callback,
     777              :                                   callback_private_data,
     778              :                                   per_buffer_data_size);
     779              : }
     780              : 
     781              : /*
     782              :  * Pull one pinned buffer out of a stream.  Each call returns successive
     783              :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     784              :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     785              :  * per-buffer data that the callback had a chance to populate, which remains
     786              :  * valid until the next call to read_stream_next_buffer().  When the stream
     787              :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     788              :  * the stream early at any time by calling read_stream_end().
     789              :  */
     790              : Buffer
     791      6333500 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     792              : {
     793              :     Buffer      buffer;
     794              :     int16       oldest_buffer_index;
     795              : 
     796              : #ifndef READ_STREAM_DISABLE_FAST_PATH
     797              : 
     798              :     /*
     799              :      * A fast path for all-cached scans.  This is the same as the usual
     800              :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
     801              :      * we can skip the queue management code, stay in the same buffer slot and
     802              :      * use singular StartReadBuffer().
     803              :      */
     804      6333500 :     if (likely(stream->fast_path))
     805              :     {
     806              :         BlockNumber next_blocknum;
     807              : 
     808              :         /* Fast path assumptions. */
     809              :         Assert(stream->ios_in_progress == 0);
     810              :         Assert(stream->forwarded_buffers == 0);
     811              :         Assert(stream->pinned_buffers == 1);
     812              :         Assert(stream->distance == 1);
     813              :         Assert(stream->pending_read_nblocks == 0);
     814              :         Assert(stream->per_buffer_data_size == 0);
     815              :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
     816              : 
     817              :         /* We're going to return the buffer we pinned last time. */
     818      2157692 :         oldest_buffer_index = stream->oldest_buffer_index;
     819              :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     820              :                stream->next_buffer_index);
     821      2157692 :         buffer = stream->buffers[oldest_buffer_index];
     822              :         Assert(buffer != InvalidBuffer);
     823              : 
     824              :         /* Choose the next block to pin. */
     825      2157692 :         next_blocknum = read_stream_get_block(stream, NULL);
     826              : 
     827      2157692 :         if (likely(next_blocknum != InvalidBlockNumber))
     828              :         {
     829      2069898 :             int         flags = stream->read_buffers_flags;
     830              : 
     831      2069898 :             if (stream->advice_enabled)
     832          537 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     833              : 
     834              :             /*
     835              :              * Pin a buffer for the next call.  Same buffer entry, and
     836              :              * arbitrary I/O entry (they're all free).  We don't have to
     837              :              * adjust pinned_buffers because we're transferring one to caller
     838              :              * but pinning one more.
     839              :              *
     840              :              * In the fast path we don't need to check the pin limit.  We're
     841              :              * always allowed at least one pin so that progress can be made,
     842              :              * and that's all we need here.  Although two pins are momentarily
     843              :              * held at the same time, the model used here is that the stream
     844              :              * holds only one, and the other now belongs to the caller.
     845              :              */
     846      2069898 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
     847              :                                         &stream->buffers[oldest_buffer_index],
     848              :                                         next_blocknum,
     849              :                                         flags)))
     850              :             {
     851              :                 /* Fast return. */
     852      2054453 :                 return buffer;
     853              :             }
     854              : 
     855              :             /* Next call must wait for I/O for the newly pinned buffer. */
     856        15445 :             stream->oldest_io_index = 0;
     857        15445 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     858        15445 :             stream->ios_in_progress = 1;
     859        15445 :             stream->ios[0].buffer_index = oldest_buffer_index;
     860        15445 :             stream->seq_blocknum = next_blocknum + 1;
     861              :         }
     862              :         else
     863              :         {
     864              :             /* No more blocks, end of stream. */
     865        87794 :             stream->distance = 0;
     866        87794 :             stream->oldest_buffer_index = stream->next_buffer_index;
     867        87794 :             stream->pinned_buffers = 0;
     868        87794 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
     869              :         }
     870              : 
     871       103239 :         stream->fast_path = false;
     872       103239 :         return buffer;
     873              :     }
     874              : #endif
     875              : 
     876      4175808 :     if (unlikely(stream->pinned_buffers == 0))
     877              :     {
     878              :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     879              : 
     880              :         /* End of stream reached?  */
     881      2811166 :         if (stream->distance == 0)
     882      1601811 :             return InvalidBuffer;
     883              : 
     884              :         /*
     885              :          * The usual order of operations is that we look ahead at the bottom
     886              :          * of this function after potentially finishing an I/O and making
     887              :          * space for more, but if we're just starting up we'll need to crank
     888              :          * the handle to get started.
     889              :          */
     890      1209355 :         read_stream_look_ahead(stream);
     891              : 
     892              :         /* End of stream reached? */
     893      1209355 :         if (stream->pinned_buffers == 0)
     894              :         {
     895              :             Assert(stream->distance == 0);
     896       583377 :             return InvalidBuffer;
     897              :         }
     898              :     }
     899              : 
     900              :     /* Grab the oldest pinned buffer and associated per-buffer data. */
     901              :     Assert(stream->pinned_buffers > 0);
     902      1990620 :     oldest_buffer_index = stream->oldest_buffer_index;
     903              :     Assert(oldest_buffer_index >= 0 &&
     904              :            oldest_buffer_index < stream->queue_size);
     905      1990620 :     buffer = stream->buffers[oldest_buffer_index];
     906      1990620 :     if (per_buffer_data)
     907       695574 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     908              : 
     909              :     Assert(BufferIsValid(buffer));
     910              : 
     911              :     /* Do we have to wait for an associated I/O first? */
     912      1990620 :     if (stream->ios_in_progress > 0 &&
     913       678366 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     914              :     {
     915       588970 :         int16       io_index = stream->oldest_io_index;
     916              :         int32       distance;   /* wider temporary value, clamped below */
     917              : 
     918              :         /* Sanity check that we still agree on the buffers. */
     919              :         Assert(stream->ios[io_index].op.buffers ==
     920              :                &stream->buffers[oldest_buffer_index]);
     921              : 
     922       588970 :         WaitReadBuffers(&stream->ios[io_index].op);
     923              : 
     924              :         Assert(stream->ios_in_progress > 0);
     925       588949 :         stream->ios_in_progress--;
     926       588949 :         if (++stream->oldest_io_index == stream->max_ios)
     927        23363 :             stream->oldest_io_index = 0;
     928              : 
     929              :         /* Look-ahead distance ramps up rapidly after we do I/O. */
     930       588949 :         distance = stream->distance * 2;
     931       588949 :         distance = Min(distance, stream->max_pinned_buffers);
     932       588949 :         stream->distance = distance;
     933              : 
     934              :         /*
     935              :          * If we've reached the first block of a sequential region we're
     936              :          * issuing advice for, cancel that until the next jump.  The kernel
     937              :          * will see the sequential preadv() pattern starting here.
     938              :          */
     939       588949 :         if (stream->advice_enabled &&
     940          272 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
     941          251 :             stream->seq_until_processed = InvalidBlockNumber;
     942              :     }
     943              : 
     944              :     /*
     945              :      * We must zap this queue entry, or else it would appear as a forwarded
     946              :      * buffer.  If it's potentially in the overflow zone (ie from a
     947              :      * multi-block I/O that wrapped around the queue), also zap the copy.
     948              :      */
     949      1990599 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
     950      1990599 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
     951      1451629 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
     952              :             InvalidBuffer;
     953              : 
     954              : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
     955              : 
     956              :     /*
     957              :      * The caller will get access to the per-buffer data, until the next call.
     958              :      * We wipe the one before, which is never occupied because queue_size
     959              :      * allowed one extra element.  This will hopefully trip up client code
     960              :      * that is holding a dangling pointer to it.
     961              :      */
     962              :     if (stream->per_buffer_data)
     963              :     {
     964              :         void       *per_buffer_data;
     965              : 
     966              :         per_buffer_data = get_per_buffer_data(stream,
     967              :                                               oldest_buffer_index == 0 ?
     968              :                                               stream->queue_size - 1 :
     969              :                                               oldest_buffer_index - 1);
     970              : 
     971              : #if defined(CLOBBER_FREED_MEMORY)
     972              :         /* This also tells Valgrind the memory is "noaccess". */
     973              :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
     974              : #elif defined(USE_VALGRIND)
     975              :         /* Tell it ourselves. */
     976              :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
     977              :                                    stream->per_buffer_data_size);
     978              : #endif
     979              :     }
     980              : #endif
     981              : 
     982              :     /* Pin transferred to caller. */
     983              :     Assert(stream->pinned_buffers > 0);
     984      1990599 :     stream->pinned_buffers--;
     985              : 
     986              :     /* Advance oldest buffer, with wrap-around. */
     987      1990599 :     stream->oldest_buffer_index++;
     988      1990599 :     if (stream->oldest_buffer_index == stream->queue_size)
     989       287621 :         stream->oldest_buffer_index = 0;
     990              : 
     991              :     /* Prepare for the next call. */
     992      1990599 :     read_stream_look_ahead(stream);
     993              : 
     994              : #ifndef READ_STREAM_DISABLE_FAST_PATH
     995              :     /* See if we can take the fast path for all-cached scans next time. */
     996      1990593 :     if (stream->ios_in_progress == 0 &&
     997      1402187 :         stream->forwarded_buffers == 0 &&
     998      1401691 :         stream->pinned_buffers == 1 &&
     999       803141 :         stream->distance == 1 &&
    1000       733634 :         stream->pending_read_nblocks == 0 &&
    1001       732245 :         stream->per_buffer_data_size == 0)
    1002              :     {
    1003              :         /*
    1004              :          * The fast path spins on one buffer entry repeatedly instead of
    1005              :          * rotating through the whole queue and clearing the entries behind
    1006              :          * it.  If the buffer it starts with happened to be forwarded between
    1007              :          * StartReadBuffers() calls and also wrapped around the circular queue
    1008              :          * partway through, then a copy also exists in the overflow zone, and
    1009              :          * it won't clear it out as the regular path would.  Do that now, so
    1010              :          * it doesn't need code for that.
    1011              :          */
    1012       199798 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
    1013       198488 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
    1014              :                 InvalidBuffer;
    1015              : 
    1016       199798 :         stream->fast_path = true;
    1017              :     }
    1018              : #endif
    1019              : 
    1020      1990593 :     return buffer;
    1021              : }
    1022              : 
    1023              : /*
    1024              :  * Transitional support for code that would like to perform or skip reads
    1025              :  * itself, without using the stream.  Returns, and consumes, the next block
    1026              :  * number that would be read by the stream's look-ahead algorithm, or
    1027              :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
    1028              :  * strategy that would be used to read it.
    1029              :  */
    1030              : BlockNumber
    1031            0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
    1032              : {
    1033            0 :     *strategy = stream->ios[0].op.strategy;
    1034            0 :     return read_stream_get_block(stream, NULL);
    1035              : }
    1036              : 
    1037              : /*
    1038              :  * Reset a read stream by releasing any queued up buffers, allowing the stream
    1039              :  * to be used again for different blocks.  This can be used to clear an
    1040              :  * end-of-stream condition and start again, or to throw away blocks that were
    1041              :  * speculatively read and read some different blocks instead.
    1042              :  */
    1043              : void
    1044      1209978 : read_stream_reset(ReadStream *stream)
    1045              : {
    1046              :     int16       index;
    1047              :     Buffer      buffer;
    1048              : 
    1049              :     /* Stop looking ahead. */
    1050      1209978 :     stream->distance = 0;
    1051              : 
    1052              :     /* Forget buffered block number and fast path state. */
    1053      1209978 :     stream->buffered_blocknum = InvalidBlockNumber;
    1054      1209978 :     stream->fast_path = false;
    1055              : 
    1056              :     /* Unpin anything that wasn't consumed. */
    1057      1335888 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    1058       125910 :         ReleaseBuffer(buffer);
    1059              : 
    1060              :     /* Unpin any unused forwarded buffers. */
    1061      1209978 :     index = stream->next_buffer_index;
    1062      1209978 :     while (index < stream->initialized_buffers &&
    1063       213659 :            (buffer = stream->buffers[index]) != InvalidBuffer)
    1064              :     {
    1065              :         Assert(stream->forwarded_buffers > 0);
    1066            0 :         stream->forwarded_buffers--;
    1067            0 :         ReleaseBuffer(buffer);
    1068              : 
    1069            0 :         stream->buffers[index] = InvalidBuffer;
    1070            0 :         if (index < stream->io_combine_limit - 1)
    1071            0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
    1072              : 
    1073            0 :         if (++index == stream->queue_size)
    1074            0 :             index = 0;
    1075              :     }
    1076              : 
    1077              :     Assert(stream->forwarded_buffers == 0);
    1078              :     Assert(stream->pinned_buffers == 0);
    1079              :     Assert(stream->ios_in_progress == 0);
    1080              : 
    1081              :     /* Start off assuming data is cached. */
    1082      1209978 :     stream->distance = 1;
    1083      1209978 : }
    1084              : 
    1085              : /*
    1086              :  * Release and free a read stream.
    1087              :  */
    1088              : void
    1089       581859 : read_stream_end(ReadStream *stream)
    1090              : {
    1091       581859 :     read_stream_reset(stream);
    1092       581859 :     pfree(stream);
    1093       581859 : }
        

Generated by: LCOV version 2.0-1