LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 257 267 96.3 %
Date: 2025-08-09 08:18:06 Functions: 12 13 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * read_stream.c
       4             :  *    Mechanism for accessing buffered relation data with look-ahead
       5             :  *
       6             :  * Code that needs to access relation data typically pins blocks one at a
       7             :  * time, often in a predictable order that might be sequential or data-driven.
       8             :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9             :  * because blocks that are not yet in the buffer pool require I/O operations
      10             :  * that are small and might stall waiting for storage.  This mechanism looks
      11             :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12             :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13             :  * distance.
      14             :  *
      15             :  * A user-provided callback generates a stream of block numbers that is used
      16             :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17             :  * pending read.  When that isn't possible, the existing pending read is sent
      18             :  * to StartReadBuffers() so that a new one can begin to form.
      19             :  *
      20             :  * The algorithm for controlling the look-ahead distance is based on recent
      21             :  * cache hit and miss history.  When no I/O is necessary, there is no benefit
      22             :  * in looking ahead more than one block.  This is the default initial
      23             :  * assumption, but when blocks needing I/O are streamed, the distance is
      24             :  * increased rapidly to try to benefit from I/O combining and concurrency.  It
      25             :  * is reduced gradually when cached blocks are streamed.
      26             :  *
      27             :  * The main data structure is a circular queue of buffers of size
      28             :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      29             :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      30             :  * variable sized object that is passed from the callback to the consumer of
      31             :  * buffers.
      32             :  *
      33             :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      34             :  * I/Os that have been started with StartReadBuffers(), and for which
      35             :  * WaitReadBuffers() must be called before returning the buffer.
      36             :  *
      37             :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      38             :  * successive calls, then these data structures might appear as follows:
      39             :  *
      40             :  *                          buffers buf/data       ios
      41             :  *
      42             :  *                          +----+  +-----+       +--------+
      43             :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      44             :  *                          +----+  +-----+  |    +--------+
      45             :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      46             :  *                          +----+  +-----+  | |  +--------+
      47             :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      48             :  *                          +----+  +-----+    |  +--------+
      49             :  *                          | 43 |  |  ?  |    |  |        |
      50             :  *                          +----+  +-----+    |  +--------+
      51             :  *                          | 44 |  |  ?  |    |  |        |
      52             :  *                          +----+  +-----+    |  +--------+
      53             :  *                          | 60 |  |  ?  |<---+
      54             :  *                          +----+  +-----+
      55             :  *     next_buffer_index -> |    |  |     |
      56             :  *                          +----+  +-----+
      57             :  *
      58             :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      59             :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      60             :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      61             :  * does block 60.
      62             :  *
      63             :  *
      64             :  * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
      65             :  * Portions Copyright (c) 1994, Regents of the University of California
      66             :  *
      67             :  * IDENTIFICATION
      68             :  *    src/backend/storage/aio/read_stream.c
      69             :  *
      70             :  *-------------------------------------------------------------------------
      71             :  */
      72             : #include "postgres.h"
      73             : 
      74             : #include "miscadmin.h"
      75             : #include "storage/aio.h"
      76             : #include "storage/fd.h"
      77             : #include "storage/smgr.h"
      78             : #include "storage/read_stream.h"
      79             : #include "utils/memdebug.h"
      80             : #include "utils/rel.h"
      81             : #include "utils/spccache.h"
      82             : 
      83             : typedef struct InProgressIO
      84             : {
      85             :     int16       buffer_index;
      86             :     ReadBuffersOperation op;
      87             : } InProgressIO;
      88             : 
      89             : /*
      90             :  * State for managing a stream of reads.
      91             :  */
      92             : struct ReadStream
      93             : {
      94             :     int16       max_ios;
      95             :     int16       io_combine_limit;
      96             :     int16       ios_in_progress;
      97             :     int16       queue_size;
      98             :     int16       max_pinned_buffers;
      99             :     int16       forwarded_buffers;
     100             :     int16       pinned_buffers;
     101             :     int16       distance;
     102             :     int16       initialized_buffers;
     103             :     int         read_buffers_flags;
     104             :     bool        sync_mode;      /* using io_method=sync */
     105             :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
     106             :     bool        advice_enabled;
     107             :     bool        temporary;
     108             : 
     109             :     /*
     110             :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     111             :      * control problems when I/Os are split.
     112             :      */
     113             :     BlockNumber buffered_blocknum;
     114             : 
     115             :     /*
     116             :      * The callback that will tell us which block numbers to read, and an
     117             :      * opaque pointer that will be pass to it for its own purposes.
     118             :      */
     119             :     ReadStreamBlockNumberCB callback;
     120             :     void       *callback_private_data;
     121             : 
     122             :     /* Next expected block, for detecting sequential access. */
     123             :     BlockNumber seq_blocknum;
     124             :     BlockNumber seq_until_processed;
     125             : 
     126             :     /* The read operation we are currently preparing. */
     127             :     BlockNumber pending_read_blocknum;
     128             :     int16       pending_read_nblocks;
     129             : 
     130             :     /* Space for buffers and optional per-buffer private data. */
     131             :     size_t      per_buffer_data_size;
     132             :     void       *per_buffer_data;
     133             : 
     134             :     /* Read operations that have been started but not waited for yet. */
     135             :     InProgressIO *ios;
     136             :     int16       oldest_io_index;
     137             :     int16       next_io_index;
     138             : 
     139             :     bool        fast_path;
     140             : 
     141             :     /* Circular queue of buffers. */
     142             :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     143             :     int16       next_buffer_index;  /* Index of next buffer to pin */
     144             :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     145             : };
     146             : 
     147             : /*
     148             :  * Return a pointer to the per-buffer data by index.
     149             :  */
     150             : static inline void *
     151     7187034 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     152             : {
     153    14374068 :     return (char *) stream->per_buffer_data +
     154     7187034 :         stream->per_buffer_data_size * buffer_index;
     155             : }
     156             : 
     157             : /*
     158             :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     159             :  * blocks [current_blocknum, last_exclusive).
     160             :  */
     161             : BlockNumber
     162      665708 : block_range_read_stream_cb(ReadStream *stream,
     163             :                            void *callback_private_data,
     164             :                            void *per_buffer_data)
     165             : {
     166      665708 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     167             : 
     168      665708 :     if (p->current_blocknum < p->last_exclusive)
     169      535542 :         return p->current_blocknum++;
     170             : 
     171      130166 :     return InvalidBlockNumber;
     172             : }
     173             : 
     174             : /*
     175             :  * Ask the callback which block it would like us to read next, with a one block
     176             :  * buffer in front to allow read_stream_unget_block() to work.
     177             :  */
     178             : static inline BlockNumber
     179    10034798 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     180             : {
     181             :     BlockNumber blocknum;
     182             : 
     183    10034798 :     blocknum = stream->buffered_blocknum;
     184    10034798 :     if (blocknum != InvalidBlockNumber)
     185          12 :         stream->buffered_blocknum = InvalidBlockNumber;
     186             :     else
     187             :     {
     188             :         /*
     189             :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
     190             :          * the "noaccess" state that was set when the consumer moved past this
     191             :          * entry last time around the queue, and should also catch callbacks
     192             :          * that fail to initialize data that the buffer consumer later
     193             :          * accesses.  On the first go around, it is undefined already.
     194             :          */
     195             :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     196             :                                     stream->per_buffer_data_size);
     197    10034786 :         blocknum = stream->callback(stream,
     198             :                                     stream->callback_private_data,
     199             :                                     per_buffer_data);
     200             :     }
     201             : 
     202    10034798 :     return blocknum;
     203             : }
     204             : 
     205             : /*
     206             :  * In order to deal with buffer shortages and I/O limits after short reads, we
     207             :  * sometimes need to defer handling of a block we've already consumed from the
     208             :  * registered callback until later.
     209             :  */
     210             : static inline void
     211          12 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     212             : {
     213             :     /* We shouldn't ever unget more than one block. */
     214             :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     215             :     Assert(blocknum != InvalidBlockNumber);
     216          12 :     stream->buffered_blocknum = blocknum;
     217          12 : }
     218             : 
     219             : /*
     220             :  * Start as much of the current pending read as we can.  If we have to split it
     221             :  * because of the per-backend buffer limit, or the buffer manager decides to
     222             :  * split it, then the pending read is adjusted to hold the remaining portion.
     223             :  *
     224             :  * We can always start a read of at least size one if we have no progress yet.
     225             :  * Otherwise it's possible that we can't start a read at all because of a lack
     226             :  * of buffers, and then false is returned.  Buffer shortages also reduce the
     227             :  * distance to a level that prevents look-ahead until buffers are released.
     228             :  */
     229             : static bool
     230     3614790 : read_stream_start_pending_read(ReadStream *stream)
     231             : {
     232             :     bool        need_wait;
     233             :     int         requested_nblocks;
     234             :     int         nblocks;
     235             :     int         flags;
     236             :     int         forwarded;
     237             :     int16       io_index;
     238             :     int16       overflow;
     239             :     int16       buffer_index;
     240             :     int         buffer_limit;
     241             : 
     242             :     /* This should only be called with a pending read. */
     243             :     Assert(stream->pending_read_nblocks > 0);
     244             :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
     245             : 
     246             :     /* We had better not exceed the per-stream buffer limit with this read. */
     247             :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     248             :            stream->max_pinned_buffers);
     249             : 
     250             : #ifdef USE_ASSERT_CHECKING
     251             :     /* We had better not be overwriting an existing pinned buffer. */
     252             :     if (stream->pinned_buffers > 0)
     253             :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     254             :     else
     255             :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     256             : 
     257             :     /*
     258             :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
     259             :      * had to split the operation should match the leading blocks of this
     260             :      * following StartReadBuffers() call.
     261             :      */
     262             :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     263             :     for (int i = 0; i < stream->forwarded_buffers; ++i)
     264             :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
     265             :                stream->pending_read_blocknum + i);
     266             : 
     267             :     /*
     268             :      * Check that we've cleared the queue/overflow entries corresponding to
     269             :      * the rest of the blocks covered by this read, unless it's the first go
     270             :      * around and we haven't even initialized them yet.
     271             :      */
     272             :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
     273             :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
     274             :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
     275             : #endif
     276             : 
     277             :     /* Do we need to issue read-ahead advice? */
     278     3614790 :     flags = stream->read_buffers_flags;
     279     3614790 :     if (stream->advice_enabled)
     280             :     {
     281        3290 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
     282             :         {
     283             :             /*
     284             :              * Sequential:  Issue advice until the preadv() calls have caught
     285             :              * up with the first advice issued for this sequential region, and
     286             :              * then stay of the way of the kernel's own read-ahead.
     287             :              */
     288          44 :             if (stream->seq_until_processed != InvalidBlockNumber)
     289           2 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     290             :         }
     291             :         else
     292             :         {
     293             :             /*
     294             :              * Random jump:  Note the starting location of a new potential
     295             :              * sequential region and start issuing advice.  Skip it this time
     296             :              * if the preadv() follows immediately, eg first block in stream.
     297             :              */
     298        3246 :             stream->seq_until_processed = stream->pending_read_blocknum;
     299        3246 :             if (stream->pinned_buffers > 0)
     300          48 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     301             :         }
     302             :     }
     303             : 
     304             :     /*
     305             :      * How many more buffers is this backend allowed?
     306             :      *
     307             :      * Forwarded buffers are already pinned and map to the leading blocks of
     308             :      * the pending read (the remaining portion of an earlier short read that
     309             :      * we're about to continue).  They are not counted in pinned_buffers, but
     310             :      * they are counted as pins already held by this backend according to the
     311             :      * buffer manager, so they must be added to the limit it grants us.
     312             :      */
     313     3614790 :     if (stream->temporary)
     314       23674 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
     315             :     else
     316     3591116 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
     317             :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     318             : 
     319     3614790 :     buffer_limit += stream->forwarded_buffers;
     320     3614790 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
     321             : 
     322     3614790 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
     323     1164904 :         buffer_limit = 1;       /* guarantee progress */
     324             : 
     325             :     /* Does the per-backend limit affect this read? */
     326     3614790 :     nblocks = stream->pending_read_nblocks;
     327     3614790 :     if (buffer_limit < nblocks)
     328             :     {
     329             :         int16       new_distance;
     330             : 
     331             :         /* Shrink distance: no more look-ahead until buffers are released. */
     332        5002 :         new_distance = stream->pinned_buffers + buffer_limit;
     333        5002 :         if (stream->distance > new_distance)
     334        3474 :             stream->distance = new_distance;
     335             : 
     336             :         /* Unless we have nothing to give the consumer, stop here. */
     337        5002 :         if (stream->pinned_buffers > 0)
     338        2496 :             return false;
     339             : 
     340             :         /* A short read is required to make progress. */
     341        2506 :         nblocks = buffer_limit;
     342             :     }
     343             : 
     344             :     /*
     345             :      * We say how many blocks we want to read, but it may be smaller on return
     346             :      * if the buffer manager decides to shorten the read.  Initialize buffers
     347             :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
     348             :      * and keep the original nblocks number so we can check for forwarded
     349             :      * buffers as output, below.
     350             :      */
     351     3612294 :     buffer_index = stream->next_buffer_index;
     352     3612294 :     io_index = stream->next_io_index;
     353     5888292 :     while (stream->initialized_buffers < buffer_index + nblocks)
     354     2275998 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
     355     3612294 :     requested_nblocks = nblocks;
     356     3612294 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     357     3612294 :                                  &stream->buffers[buffer_index],
     358             :                                  stream->pending_read_blocknum,
     359             :                                  &nblocks,
     360             :                                  flags);
     361     3612282 :     stream->pinned_buffers += nblocks;
     362             : 
     363             :     /* Remember whether we need to wait before returning this buffer. */
     364     3612282 :     if (!need_wait)
     365             :     {
     366             :         /* Look-ahead distance decays, no I/O necessary. */
     367     2558324 :         if (stream->distance > 1)
     368       30826 :             stream->distance--;
     369             :     }
     370             :     else
     371             :     {
     372             :         /*
     373             :          * Remember to call WaitReadBuffers() before returning head buffer.
     374             :          * Look-ahead distance will be adjusted after waiting.
     375             :          */
     376     1053958 :         stream->ios[io_index].buffer_index = buffer_index;
     377     1053958 :         if (++stream->next_io_index == stream->max_ios)
     378       42240 :             stream->next_io_index = 0;
     379             :         Assert(stream->ios_in_progress < stream->max_ios);
     380     1053958 :         stream->ios_in_progress++;
     381     1053958 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     382             :     }
     383             : 
     384             :     /*
     385             :      * How many pins were acquired but forwarded to the next call?  These need
     386             :      * to be passed to the next StartReadBuffers() call by leaving them
     387             :      * exactly where they are in the queue, or released if the stream ends
     388             :      * early.  We need the number for accounting purposes, since they are not
     389             :      * counted in stream->pinned_buffers but we already hold them.
     390             :      */
     391     3612282 :     forwarded = 0;
     392     3615002 :     while (nblocks + forwarded < requested_nblocks &&
     393      102312 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
     394        2720 :         forwarded++;
     395     3612282 :     stream->forwarded_buffers = forwarded;
     396             : 
     397             :     /*
     398             :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     399             :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
     400             :      * the front of the array where they'll be consumed, but also leave a copy
     401             :      * in the overflow zone which the I/O operation has a pointer to (it needs
     402             :      * a contiguous array).  Both copies will be cleared when the buffers are
     403             :      * handed to the consumer.
     404             :      */
     405     3612282 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
     406     3612282 :     if (overflow > 0)
     407             :     {
     408             :         Assert(overflow < stream->queue_size);    /* can't overlap */
     409         646 :         memcpy(&stream->buffers[0],
     410         646 :                &stream->buffers[stream->queue_size],
     411             :                sizeof(stream->buffers[0]) * overflow);
     412             :     }
     413             : 
     414             :     /* Compute location of start of next read, without using % operator. */
     415     3612282 :     buffer_index += nblocks;
     416     3612282 :     if (buffer_index >= stream->queue_size)
     417      543038 :         buffer_index -= stream->queue_size;
     418             :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     419     3612282 :     stream->next_buffer_index = buffer_index;
     420             : 
     421             :     /* Adjust the pending read to cover the remaining portion, if any. */
     422     3612282 :     stream->pending_read_blocknum += nblocks;
     423     3612282 :     stream->pending_read_nblocks -= nblocks;
     424             : 
     425     3612282 :     return true;
     426             : }
     427             : 
     428             : static void
     429     6092668 : read_stream_look_ahead(ReadStream *stream)
     430             : {
     431             :     /*
     432             :      * Allow amortizing the cost of submitting IO over multiple IOs. This
     433             :      * requires that we don't do any operations that could lead to a deadlock
     434             :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
     435             :      * careful.
     436             :      */
     437     6092668 :     if (stream->batch_mode)
     438     4983528 :         pgaio_enter_batchmode();
     439             : 
     440    10037040 :     while (stream->ios_in_progress < stream->max_ios &&
     441    10037032 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     442             :     {
     443             :         BlockNumber blocknum;
     444             :         int16       buffer_index;
     445             :         void       *per_buffer_data;
     446             : 
     447     5899542 :         if (stream->pending_read_nblocks == stream->io_combine_limit)
     448             :         {
     449        7322 :             read_stream_start_pending_read(stream);
     450        7322 :             continue;
     451             :         }
     452             : 
     453             :         /*
     454             :          * See which block the callback wants next in the stream.  We need to
     455             :          * compute the index of the Nth block of the pending read including
     456             :          * wrap-around, but we don't want to use the expensive % operator.
     457             :          */
     458     5892220 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     459     5892220 :         if (buffer_index >= stream->queue_size)
     460        3534 :             buffer_index -= stream->queue_size;
     461             :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     462     5892220 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     463     5892220 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     464     5892220 :         if (blocknum == InvalidBlockNumber)
     465             :         {
     466             :             /* End of stream. */
     467     1955158 :             stream->distance = 0;
     468     1955158 :             break;
     469             :         }
     470             : 
     471             :         /* Can we merge it with the pending read? */
     472     3937062 :         if (stream->pending_read_nblocks > 0 &&
     473      427634 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     474             :         {
     475      427556 :             stream->pending_read_nblocks++;
     476      427556 :             continue;
     477             :         }
     478             : 
     479             :         /* We have to start the pending read before we can build another. */
     480     3509582 :         while (stream->pending_read_nblocks > 0)
     481             :         {
     482          88 :             if (!read_stream_start_pending_read(stream) ||
     483          76 :                 stream->ios_in_progress == stream->max_ios)
     484             :             {
     485             :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
     486          12 :                 read_stream_unget_block(stream, blocknum);
     487          12 :                 if (stream->batch_mode)
     488          12 :                     pgaio_exit_batchmode();
     489          12 :                 return;
     490             :             }
     491             :         }
     492             : 
     493             :         /* This is the start of a new pending read. */
     494     3509494 :         stream->pending_read_blocknum = blocknum;
     495     3509494 :         stream->pending_read_nblocks = 1;
     496             :     }
     497             : 
     498             :     /*
     499             :      * We don't start the pending read just because we've hit the distance
     500             :      * limit, preferring to give it another chance to grow to full
     501             :      * io_combine_limit size once more buffers have been consumed.  However,
     502             :      * if we've already reached io_combine_limit, or we've reached the
     503             :      * distance limit and there isn't anything pinned yet, or the callback has
     504             :      * signaled end-of-stream, we start the read immediately.  Note that the
     505             :      * pending read can exceed the distance goal, if the latter was reduced
     506             :      * after hitting the per-backend buffer limit.
     507             :      */
     508     6092656 :     if (stream->pending_read_nblocks > 0 &&
     509     3684846 :         (stream->pending_read_nblocks == stream->io_combine_limit ||
     510     3669732 :          (stream->pending_read_nblocks >= stream->distance &&
     511     3592270 :           stream->pinned_buffers == 0) ||
     512       88928 :          stream->distance == 0) &&
     513     3607384 :         stream->ios_in_progress < stream->max_ios)
     514     3607380 :         read_stream_start_pending_read(stream);
     515             : 
     516             :     /*
     517             :      * There should always be something pinned when we leave this function,
     518             :      * whether started by this call or not, unless we've hit the end of the
     519             :      * stream.  In the worst case we can always make progress one buffer at a
     520             :      * time.
     521             :      */
     522             :     Assert(stream->pinned_buffers > 0 || stream->distance == 0);
     523             : 
     524     6092644 :     if (stream->batch_mode)
     525     4983504 :         pgaio_exit_batchmode();
     526             : }
     527             : 
     528             : /*
     529             :  * Create a new read stream object that can be used to perform the equivalent
     530             :  * of a series of ReadBuffer() calls for one fork of one relation.
     531             :  * Internally, it generates larger vectored reads where possible by looking
     532             :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     533             :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     534             :  * write extra data for each block into the space provided to it.  It will
     535             :  * also receive callback_private_data for its own purposes.
     536             :  */
     537             : static ReadStream *
     538     1135228 : read_stream_begin_impl(int flags,
     539             :                        BufferAccessStrategy strategy,
     540             :                        Relation rel,
     541             :                        SMgrRelation smgr,
     542             :                        char persistence,
     543             :                        ForkNumber forknum,
     544             :                        ReadStreamBlockNumberCB callback,
     545             :                        void *callback_private_data,
     546             :                        size_t per_buffer_data_size)
     547             : {
     548             :     ReadStream *stream;
     549             :     size_t      size;
     550             :     int16       queue_size;
     551             :     int16       queue_overflow;
     552             :     int         max_ios;
     553             :     int         strategy_pin_limit;
     554             :     uint32      max_pinned_buffers;
     555             :     uint32      max_possible_buffer_limit;
     556             :     Oid         tablespace_id;
     557             : 
     558             :     /*
     559             :      * Decide how many I/Os we will allow to run at the same time.  That
     560             :      * currently means advice to the kernel to tell it that we will soon read.
     561             :      * This number also affects how far we look ahead for opportunities to
     562             :      * start more I/Os.
     563             :      */
     564     1135228 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     565     1135228 :     if (!OidIsValid(MyDatabaseId) ||
     566     1298880 :         (rel && IsCatalogRelation(rel)) ||
     567      304134 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     568             :     {
     569             :         /*
     570             :          * Avoid circularity while trying to look up tablespace settings or
     571             :          * before spccache.c is ready.
     572             :          */
     573      943548 :         max_ios = effective_io_concurrency;
     574             :     }
     575      191680 :     else if (flags & READ_STREAM_MAINTENANCE)
     576       32268 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     577             :     else
     578      159412 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     579             : 
     580             :     /* Cap to INT16_MAX to avoid overflowing below */
     581     1135228 :     max_ios = Min(max_ios, PG_INT16_MAX);
     582             : 
     583             :     /*
     584             :      * If starting a multi-block I/O near the end of the queue, we might
     585             :      * temporarily need extra space for overflowing buffers before they are
     586             :      * moved to regular circular position.  This is the maximum extra space we
     587             :      * could need.
     588             :      */
     589     1135228 :     queue_overflow = io_combine_limit - 1;
     590             : 
     591             :     /*
     592             :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     593             :      * pin fewer if we can, though.  We add one so that we can make progress
     594             :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
     595             :      * this also allows an extra full I/O's worth of buffers: after an I/O
     596             :      * finishes we don't want to have to wait for its buffers to be consumed
     597             :      * before starting a new one.
     598             :      *
     599             :      * Be careful not to allow int16 to overflow.  That is possible with the
     600             :      * current GUC range limits, so this is an artificial limit of ~32k
     601             :      * buffers and we'd need to adjust the types to exceed that.  We also have
     602             :      * to allow for the spare entry and the overflow space.
     603             :      */
     604     1135228 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
     605     1135228 :     max_pinned_buffers = Min(max_pinned_buffers,
     606             :                              PG_INT16_MAX - queue_overflow - 1);
     607             : 
     608             :     /* Give the strategy a chance to limit the number of buffers we pin. */
     609     1135228 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     610     1135228 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     611             : 
     612             :     /*
     613             :      * Also limit our queue to the maximum number of pins we could ever be
     614             :      * allowed to acquire according to the buffer manager.  We may not really
     615             :      * be able to use them all due to other pins held by this backend, but
     616             :      * we'll check that later in read_stream_start_pending_read().
     617             :      */
     618     1135228 :     if (SmgrIsTemp(smgr))
     619       13478 :         max_possible_buffer_limit = GetLocalPinLimit();
     620             :     else
     621     1121750 :         max_possible_buffer_limit = GetPinLimit();
     622     1135228 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
     623             : 
     624             :     /*
     625             :      * The limit might be zero on a system configured with too few buffers for
     626             :      * the number of connections.  We need at least one to make progress.
     627             :      */
     628     1135228 :     max_pinned_buffers = Max(1, max_pinned_buffers);
     629             : 
     630             :     /*
     631             :      * We need one extra entry for buffers and per-buffer data, because users
     632             :      * of per-buffer data have access to the object until the next call to
     633             :      * read_stream_next_buffer(), so we need a gap between the head and tail
     634             :      * of the queue so that we don't clobber it.
     635             :      */
     636     1135228 :     queue_size = max_pinned_buffers + 1;
     637             : 
     638             :     /*
     639             :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     640             :      * one big chunk.  Though we have queue_size buffers, we want to be able
     641             :      * to assume that all the buffers for a single read are contiguous (i.e.
     642             :      * don't wrap around halfway through), so we allow temporary overflows of
     643             :      * up to the maximum possible overflow size.
     644             :      */
     645     1135228 :     size = offsetof(ReadStream, buffers);
     646     1135228 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
     647     1135228 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     648     1135228 :     size += per_buffer_data_size * queue_size;
     649     1135228 :     size += MAXIMUM_ALIGNOF * 2;
     650     1135228 :     stream = (ReadStream *) palloc(size);
     651     1135228 :     memset(stream, 0, offsetof(ReadStream, buffers));
     652     1135228 :     stream->ios = (InProgressIO *)
     653     1135228 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
     654     1135228 :     if (per_buffer_data_size > 0)
     655      259552 :         stream->per_buffer_data = (void *)
     656      259552 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     657             : 
     658     1135228 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
     659     1135228 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
     660             : 
     661             : #ifdef USE_PREFETCH
     662             : 
     663             :     /*
     664             :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
     665             :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
     666             :      * caller hasn't promised sequential access (overriding our detection
     667             :      * heuristics), and max_ios hasn't been set to zero.
     668             :      */
     669     1135228 :     if (stream->sync_mode &&
     670        5774 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
     671        5774 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     672             :         max_ios > 0)
     673        1384 :         stream->advice_enabled = true;
     674             : #endif
     675             : 
     676             :     /*
     677             :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
     678             :      * we still need to allocate space to combine and run one I/O.  Bump it up
     679             :      * to one, and remember to ask for synchronous I/O only.
     680             :      */
     681     1135228 :     if (max_ios == 0)
     682             :     {
     683          14 :         max_ios = 1;
     684          14 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
     685             :     }
     686             : 
     687             :     /*
     688             :      * Capture stable values for these two GUC-derived numbers for the
     689             :      * lifetime of this stream, so we don't have to worry about the GUCs
     690             :      * changing underneath us beyond this point.
     691             :      */
     692     1135228 :     stream->max_ios = max_ios;
     693     1135228 :     stream->io_combine_limit = io_combine_limit;
     694             : 
     695     1135228 :     stream->per_buffer_data_size = per_buffer_data_size;
     696     1135228 :     stream->max_pinned_buffers = max_pinned_buffers;
     697     1135228 :     stream->queue_size = queue_size;
     698     1135228 :     stream->callback = callback;
     699     1135228 :     stream->callback_private_data = callback_private_data;
     700     1135228 :     stream->buffered_blocknum = InvalidBlockNumber;
     701     1135228 :     stream->seq_blocknum = InvalidBlockNumber;
     702     1135228 :     stream->seq_until_processed = InvalidBlockNumber;
     703     1135228 :     stream->temporary = SmgrIsTemp(smgr);
     704             : 
     705             :     /*
     706             :      * Skip the initial ramp-up phase if the caller says we're going to be
     707             :      * reading the whole relation.  This way we start out assuming we'll be
     708             :      * doing full io_combine_limit sized reads.
     709             :      */
     710     1135228 :     if (flags & READ_STREAM_FULL)
     711      131778 :         stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
     712             :     else
     713     1003450 :         stream->distance = 1;
     714             : 
     715             :     /*
     716             :      * Since we always access the same relation, we can initialize parts of
     717             :      * the ReadBuffersOperation objects and leave them that way, to avoid
     718             :      * wasting CPU cycles writing to them for each read.
     719             :      */
     720    19341710 :     for (int i = 0; i < max_ios; ++i)
     721             :     {
     722    18206482 :         stream->ios[i].op.rel = rel;
     723    18206482 :         stream->ios[i].op.smgr = smgr;
     724    18206482 :         stream->ios[i].op.persistence = persistence;
     725    18206482 :         stream->ios[i].op.forknum = forknum;
     726    18206482 :         stream->ios[i].op.strategy = strategy;
     727             :     }
     728             : 
     729     1135228 :     return stream;
     730             : }
     731             : 
     732             : /*
     733             :  * Create a new read stream for reading a relation.
     734             :  * See read_stream_begin_impl() for the detailed explanation.
     735             :  */
     736             : ReadStream *
     737     1014638 : read_stream_begin_relation(int flags,
     738             :                            BufferAccessStrategy strategy,
     739             :                            Relation rel,
     740             :                            ForkNumber forknum,
     741             :                            ReadStreamBlockNumberCB callback,
     742             :                            void *callback_private_data,
     743             :                            size_t per_buffer_data_size)
     744             : {
     745     1014638 :     return read_stream_begin_impl(flags,
     746             :                                   strategy,
     747             :                                   rel,
     748             :                                   RelationGetSmgr(rel),
     749     1014638 :                                   rel->rd_rel->relpersistence,
     750             :                                   forknum,
     751             :                                   callback,
     752             :                                   callback_private_data,
     753             :                                   per_buffer_data_size);
     754             : }
     755             : 
     756             : /*
     757             :  * Create a new read stream for reading a SMgr relation.
     758             :  * See read_stream_begin_impl() for the detailed explanation.
     759             :  */
     760             : ReadStream *
     761      120590 : read_stream_begin_smgr_relation(int flags,
     762             :                                 BufferAccessStrategy strategy,
     763             :                                 SMgrRelation smgr,
     764             :                                 char smgr_persistence,
     765             :                                 ForkNumber forknum,
     766             :                                 ReadStreamBlockNumberCB callback,
     767             :                                 void *callback_private_data,
     768             :                                 size_t per_buffer_data_size)
     769             : {
     770      120590 :     return read_stream_begin_impl(flags,
     771             :                                   strategy,
     772             :                                   NULL,
     773             :                                   smgr,
     774             :                                   smgr_persistence,
     775             :                                   forknum,
     776             :                                   callback,
     777             :                                   callback_private_data,
     778             :                                   per_buffer_data_size);
     779             : }
     780             : 
     781             : /*
     782             :  * Pull one pinned buffer out of a stream.  Each call returns successive
     783             :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     784             :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     785             :  * per-buffer data that the callback had a chance to populate, which remains
     786             :  * valid until the next call to read_stream_next_buffer().  When the stream
     787             :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     788             :  * the stream early at any time by calling read_stream_end().
     789             :  */
     790             : Buffer
     791    12109440 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     792             : {
     793             :     Buffer      buffer;
     794             :     int16       oldest_buffer_index;
     795             : 
     796             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     797             : 
     798             :     /*
     799             :      * A fast path for all-cached scans.  This is the same as the usual
     800             :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
     801             :      * we can skip the queue management code, stay in the same buffer slot and
     802             :      * use singular StartReadBuffer().
     803             :      */
     804    12109440 :     if (likely(stream->fast_path))
     805             :     {
     806             :         BlockNumber next_blocknum;
     807             : 
     808             :         /* Fast path assumptions. */
     809             :         Assert(stream->ios_in_progress == 0);
     810             :         Assert(stream->forwarded_buffers == 0);
     811             :         Assert(stream->pinned_buffers == 1);
     812             :         Assert(stream->distance == 1);
     813             :         Assert(stream->pending_read_nblocks == 0);
     814             :         Assert(stream->per_buffer_data_size == 0);
     815             :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
     816             : 
     817             :         /* We're going to return the buffer we pinned last time. */
     818     4142578 :         oldest_buffer_index = stream->oldest_buffer_index;
     819             :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     820             :                stream->next_buffer_index);
     821     4142578 :         buffer = stream->buffers[oldest_buffer_index];
     822             :         Assert(buffer != InvalidBuffer);
     823             : 
     824             :         /* Choose the next block to pin. */
     825     4142578 :         next_blocknum = read_stream_get_block(stream, NULL);
     826             : 
     827     4142578 :         if (likely(next_blocknum != InvalidBlockNumber))
     828             :         {
     829     3971234 :             int         flags = stream->read_buffers_flags;
     830             : 
     831     3971234 :             if (stream->advice_enabled)
     832        1058 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     833             : 
     834             :             /*
     835             :              * Pin a buffer for the next call.  Same buffer entry, and
     836             :              * arbitrary I/O entry (they're all free).  We don't have to
     837             :              * adjust pinned_buffers because we're transferring one to caller
     838             :              * but pinning one more.
     839             :              *
     840             :              * In the fast path we don't need to check the pin limit.  We're
     841             :              * always allowed at least one pin so that progress can be made,
     842             :              * and that's all we need here.  Although two pins are momentarily
     843             :              * held at the same time, the model used here is that the stream
     844             :              * holds only one, and the other now belongs to the caller.
     845             :              */
     846     3971234 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
     847             :                                         &stream->buffers[oldest_buffer_index],
     848             :                                         next_blocknum,
     849             :                                         flags)))
     850             :             {
     851             :                 /* Fast return. */
     852     3944656 :                 return buffer;
     853             :             }
     854             : 
     855             :             /* Next call must wait for I/O for the newly pinned buffer. */
     856       26578 :             stream->oldest_io_index = 0;
     857       26578 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     858       26578 :             stream->ios_in_progress = 1;
     859       26578 :             stream->ios[0].buffer_index = oldest_buffer_index;
     860       26578 :             stream->seq_blocknum = next_blocknum + 1;
     861             :         }
     862             :         else
     863             :         {
     864             :             /* No more blocks, end of stream. */
     865      171344 :             stream->distance = 0;
     866      171344 :             stream->oldest_buffer_index = stream->next_buffer_index;
     867      171344 :             stream->pinned_buffers = 0;
     868      171344 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
     869             :         }
     870             : 
     871      197922 :         stream->fast_path = false;
     872      197922 :         return buffer;
     873             :     }
     874             : #endif
     875             : 
     876     7966862 :     if (unlikely(stream->pinned_buffers == 0))
     877             :     {
     878             :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     879             : 
     880             :         /* End of stream reached?  */
     881     5444524 :         if (stream->distance == 0)
     882     3115718 :             return InvalidBuffer;
     883             : 
     884             :         /*
     885             :          * The usual order of operations is that we look ahead at the bottom
     886             :          * of this function after potentially finishing an I/O and making
     887             :          * space for more, but if we're just starting up we'll need to crank
     888             :          * the handle to get started.
     889             :          */
     890     2328806 :         read_stream_look_ahead(stream);
     891             : 
     892             :         /* End of stream reached? */
     893     2328806 :         if (stream->pinned_buffers == 0)
     894             :         {
     895             :             Assert(stream->distance == 0);
     896     1087240 :             return InvalidBuffer;
     897             :         }
     898             :     }
     899             : 
     900             :     /* Grab the oldest pinned buffer and associated per-buffer data. */
     901             :     Assert(stream->pinned_buffers > 0);
     902     3763904 :     oldest_buffer_index = stream->oldest_buffer_index;
     903             :     Assert(oldest_buffer_index >= 0 &&
     904             :            oldest_buffer_index < stream->queue_size);
     905     3763904 :     buffer = stream->buffers[oldest_buffer_index];
     906     3763904 :     if (per_buffer_data)
     907     1294814 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     908             : 
     909             :     Assert(BufferIsValid(buffer));
     910             : 
     911             :     /* Do we have to wait for an associated I/O first? */
     912     3763904 :     if (stream->ios_in_progress > 0 &&
     913     1247494 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     914             :     {
     915     1079936 :         int16       io_index = stream->oldest_io_index;
     916             :         int32       distance;   /* wider temporary value, clamped below */
     917             : 
     918             :         /* Sanity check that we still agree on the buffers. */
     919             :         Assert(stream->ios[io_index].op.buffers ==
     920             :                &stream->buffers[oldest_buffer_index]);
     921             : 
     922     1079936 :         WaitReadBuffers(&stream->ios[io_index].op);
     923             : 
     924             :         Assert(stream->ios_in_progress > 0);
     925     1079894 :         stream->ios_in_progress--;
     926     1079894 :         if (++stream->oldest_io_index == stream->max_ios)
     927       42240 :             stream->oldest_io_index = 0;
     928             : 
     929             :         /* Look-ahead distance ramps up rapidly after we do I/O. */
     930     1079894 :         distance = stream->distance * 2;
     931     1079894 :         distance = Min(distance, stream->max_pinned_buffers);
     932     1079894 :         stream->distance = distance;
     933             : 
     934             :         /*
     935             :          * If we've reached the first block of a sequential region we're
     936             :          * issuing advice for, cancel that until the next jump.  The kernel
     937             :          * will see the sequential preadv() pattern starting here.
     938             :          */
     939     1079894 :         if (stream->advice_enabled &&
     940         544 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
     941         502 :             stream->seq_until_processed = InvalidBlockNumber;
     942             :     }
     943             : 
     944             :     /*
     945             :      * We must zap this queue entry, or else it would appear as a forwarded
     946             :      * buffer.  If it's potentially in the overflow zone (ie from a
     947             :      * multi-block I/O that wrapped around the queue), also zap the copy.
     948             :      */
     949     3763862 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
     950     3763862 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
     951     2772814 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
     952             :             InvalidBuffer;
     953             : 
     954             : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
     955             : 
     956             :     /*
     957             :      * The caller will get access to the per-buffer data, until the next call.
     958             :      * We wipe the one before, which is never occupied because queue_size
     959             :      * allowed one extra element.  This will hopefully trip up client code
     960             :      * that is holding a dangling pointer to it.
     961             :      */
     962             :     if (stream->per_buffer_data)
     963             :     {
     964             :         void       *per_buffer_data;
     965             : 
     966             :         per_buffer_data = get_per_buffer_data(stream,
     967             :                                               oldest_buffer_index == 0 ?
     968             :                                               stream->queue_size - 1 :
     969             :                                               oldest_buffer_index - 1);
     970             : 
     971             : #if defined(CLOBBER_FREED_MEMORY)
     972             :         /* This also tells Valgrind the memory is "noaccess". */
     973             :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
     974             : #elif defined(USE_VALGRIND)
     975             :         /* Tell it ourselves. */
     976             :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
     977             :                                    stream->per_buffer_data_size);
     978             : #endif
     979             :     }
     980             : #endif
     981             : 
     982             :     /* Pin transferred to caller. */
     983             :     Assert(stream->pinned_buffers > 0);
     984     3763862 :     stream->pinned_buffers--;
     985             : 
     986             :     /* Advance oldest buffer, with wrap-around. */
     987     3763862 :     stream->oldest_buffer_index++;
     988     3763862 :     if (stream->oldest_buffer_index == stream->queue_size)
     989      528640 :         stream->oldest_buffer_index = 0;
     990             : 
     991             :     /* Prepare for the next call. */
     992     3763862 :     read_stream_look_ahead(stream);
     993             : 
     994             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     995             :     /* See if we can take the fast path for all-cached scans next time. */
     996     3763850 :     if (stream->ios_in_progress == 0 &&
     997     2683612 :         stream->forwarded_buffers == 0 &&
     998     2682612 :         stream->pinned_buffers == 1 &&
     999     1500720 :         stream->distance == 1 &&
    1000     1364336 :         stream->pending_read_nblocks == 0 &&
    1001     1361722 :         stream->per_buffer_data_size == 0)
    1002             :     {
    1003             :         /*
    1004             :          * The fast path spins on one buffer entry repeatedly instead of
    1005             :          * rotating through the whole queue and clearing the entries behind
    1006             :          * it.  If the buffer it starts with happened to be forwarded between
    1007             :          * StartReadBuffers() calls and also wrapped around the circular queue
    1008             :          * partway through, then a copy also exists in the overflow zone, and
    1009             :          * it won't clear it out as the regular path would.  Do that now, so
    1010             :          * it doesn't need code for that.
    1011             :          */
    1012      388408 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
    1013      385942 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
    1014             :                 InvalidBuffer;
    1015             : 
    1016      388408 :         stream->fast_path = true;
    1017             :     }
    1018             : #endif
    1019             : 
    1020     3763850 :     return buffer;
    1021             : }
    1022             : 
    1023             : /*
    1024             :  * Transitional support for code that would like to perform or skip reads
    1025             :  * itself, without using the stream.  Returns, and consumes, the next block
    1026             :  * number that would be read by the stream's look-ahead algorithm, or
    1027             :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
    1028             :  * strategy that would be used to read it.
    1029             :  */
    1030             : BlockNumber
    1031           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
    1032             : {
    1033           0 :     *strategy = stream->ios[0].op.strategy;
    1034           0 :     return read_stream_get_block(stream, NULL);
    1035             : }
    1036             : 
    1037             : /*
    1038             :  * Reset a read stream by releasing any queued up buffers, allowing the stream
    1039             :  * to be used again for different blocks.  This can be used to clear an
    1040             :  * end-of-stream condition and start again, or to throw away blocks that were
    1041             :  * speculatively read and read some different blocks instead.
    1042             :  */
    1043             : void
    1044     2330030 : read_stream_reset(ReadStream *stream)
    1045             : {
    1046             :     int16       index;
    1047             :     Buffer      buffer;
    1048             : 
    1049             :     /* Stop looking ahead. */
    1050     2330030 :     stream->distance = 0;
    1051             : 
    1052             :     /* Forget buffered block number and fast path state. */
    1053     2330030 :     stream->buffered_blocknum = InvalidBlockNumber;
    1054     2330030 :     stream->fast_path = false;
    1055             : 
    1056             :     /* Unpin anything that wasn't consumed. */
    1057     2573430 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    1058      243400 :         ReleaseBuffer(buffer);
    1059             : 
    1060             :     /* Unpin any unused forwarded buffers. */
    1061     2330030 :     index = stream->next_buffer_index;
    1062     2330030 :     while (index < stream->initialized_buffers &&
    1063      437442 :            (buffer = stream->buffers[index]) != InvalidBuffer)
    1064             :     {
    1065             :         Assert(stream->forwarded_buffers > 0);
    1066           0 :         stream->forwarded_buffers--;
    1067           0 :         ReleaseBuffer(buffer);
    1068             : 
    1069           0 :         stream->buffers[index] = InvalidBuffer;
    1070           0 :         if (index < stream->io_combine_limit - 1)
    1071           0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
    1072             : 
    1073           0 :         if (++index == stream->queue_size)
    1074           0 :             index = 0;
    1075             :     }
    1076             : 
    1077             :     Assert(stream->forwarded_buffers == 0);
    1078             :     Assert(stream->pinned_buffers == 0);
    1079             :     Assert(stream->ios_in_progress == 0);
    1080             : 
    1081             :     /* Start off assuming data is cached. */
    1082     2330030 :     stream->distance = 1;
    1083     2330030 : }
    1084             : 
    1085             : /*
    1086             :  * Release and free a read stream.
    1087             :  */
    1088             : void
    1089     1130672 : read_stream_end(ReadStream *stream)
    1090             : {
    1091     1130672 :     read_stream_reset(stream);
    1092     1130672 :     pfree(stream);
    1093     1130672 : }

Generated by: LCOV version 1.16