LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 254 266 95.5 %
Date: 2025-04-24 13:15:39 Functions: 12 13 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * read_stream.c
       4             :  *    Mechanism for accessing buffered relation data with look-ahead
       5             :  *
       6             :  * Code that needs to access relation data typically pins blocks one at a
       7             :  * time, often in a predictable order that might be sequential or data-driven.
       8             :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9             :  * because blocks that are not yet in the buffer pool require I/O operations
      10             :  * that are small and might stall waiting for storage.  This mechanism looks
      11             :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12             :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13             :  * distance.
      14             :  *
      15             :  * A user-provided callback generates a stream of block numbers that is used
      16             :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17             :  * pending read.  When that isn't possible, the existing pending read is sent
      18             :  * to StartReadBuffers() so that a new one can begin to form.
      19             :  *
      20             :  * The algorithm for controlling the look-ahead distance is based on recent
      21             :  * cache hit and miss history.  When no I/O is necessary, there is no benefit
      22             :  * in looking ahead more than one block.  This is the default initial
      23             :  * assumption, but when blocks needing I/O are streamed, the distance is
      24             :  * increased rapidly to try to benefit from I/O combining and concurrency.  It
      25             :  * is reduced gradually when cached blocks are streamed.
      26             :  *
      27             :  * The main data structure is a circular queue of buffers of size
      28             :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      29             :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      30             :  * variable sized object that is passed from the callback to the consumer of
      31             :  * buffers.
      32             :  *
      33             :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      34             :  * I/Os that have been started with StartReadBuffers(), and for which
      35             :  * WaitReadBuffers() must be called before returning the buffer.
      36             :  *
      37             :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      38             :  * successive calls, then these data structures might appear as follows:
      39             :  *
      40             :  *                          buffers buf/data       ios
      41             :  *
      42             :  *                          +----+  +-----+       +--------+
      43             :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      44             :  *                          +----+  +-----+  |    +--------+
      45             :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      46             :  *                          +----+  +-----+  | |  +--------+
      47             :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      48             :  *                          +----+  +-----+    |  +--------+
      49             :  *                          | 43 |  |  ?  |    |  |        |
      50             :  *                          +----+  +-----+    |  +--------+
      51             :  *                          | 44 |  |  ?  |    |  |        |
      52             :  *                          +----+  +-----+    |  +--------+
      53             :  *                          | 60 |  |  ?  |<---+
      54             :  *                          +----+  +-----+
      55             :  *     next_buffer_index -> |    |  |     |
      56             :  *                          +----+  +-----+
      57             :  *
      58             :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      59             :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      60             :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      61             :  * does block 60.
      62             :  *
      63             :  *
      64             :  * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
      65             :  * Portions Copyright (c) 1994, Regents of the University of California
      66             :  *
      67             :  * IDENTIFICATION
      68             :  *    src/backend/storage/aio/read_stream.c
      69             :  *
      70             :  *-------------------------------------------------------------------------
      71             :  */
      72             : #include "postgres.h"
      73             : 
      74             : #include "miscadmin.h"
      75             : #include "storage/aio.h"
      76             : #include "storage/fd.h"
      77             : #include "storage/smgr.h"
      78             : #include "storage/read_stream.h"
      79             : #include "utils/memdebug.h"
      80             : #include "utils/rel.h"
      81             : #include "utils/spccache.h"
      82             : 
      83             : typedef struct InProgressIO
      84             : {
      85             :     int16       buffer_index;
      86             :     ReadBuffersOperation op;
      87             : } InProgressIO;
      88             : 
      89             : /*
      90             :  * State for managing a stream of reads.
      91             :  */
      92             : struct ReadStream
      93             : {
      94             :     int16       max_ios;
      95             :     int16       io_combine_limit;
      96             :     int16       ios_in_progress;
      97             :     int16       queue_size;
      98             :     int16       max_pinned_buffers;
      99             :     int16       forwarded_buffers;
     100             :     int16       pinned_buffers;
     101             :     int16       distance;
     102             :     int16       initialized_buffers;
     103             :     int         read_buffers_flags;
     104             :     bool        sync_mode;      /* using io_method=sync */
     105             :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
     106             :     bool        advice_enabled;
     107             :     bool        temporary;
     108             : 
     109             :     /*
     110             :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     111             :      * control problems when I/Os are split.
     112             :      */
     113             :     BlockNumber buffered_blocknum;
     114             : 
     115             :     /*
     116             :      * The callback that will tell us which block numbers to read, and an
     117             :      * opaque pointer that will be pass to it for its own purposes.
     118             :      */
     119             :     ReadStreamBlockNumberCB callback;
     120             :     void       *callback_private_data;
     121             : 
     122             :     /* Next expected block, for detecting sequential access. */
     123             :     BlockNumber seq_blocknum;
     124             :     BlockNumber seq_until_processed;
     125             : 
     126             :     /* The read operation we are currently preparing. */
     127             :     BlockNumber pending_read_blocknum;
     128             :     int16       pending_read_nblocks;
     129             : 
     130             :     /* Space for buffers and optional per-buffer private data. */
     131             :     size_t      per_buffer_data_size;
     132             :     void       *per_buffer_data;
     133             : 
     134             :     /* Read operations that have been started but not waited for yet. */
     135             :     InProgressIO *ios;
     136             :     int16       oldest_io_index;
     137             :     int16       next_io_index;
     138             : 
     139             :     bool        fast_path;
     140             : 
     141             :     /* Circular queue of buffers. */
     142             :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     143             :     int16       next_buffer_index;  /* Index of next buffer to pin */
     144             :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     145             : };
     146             : 
     147             : /*
     148             :  * Return a pointer to the per-buffer data by index.
     149             :  */
     150             : static inline void *
     151     7379668 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     152             : {
     153    14759336 :     return (char *) stream->per_buffer_data +
     154     7379668 :         stream->per_buffer_data_size * buffer_index;
     155             : }
     156             : 
     157             : /*
     158             :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     159             :  * blocks [current_blocknum, last_exclusive).
     160             :  */
     161             : BlockNumber
     162      658238 : block_range_read_stream_cb(ReadStream *stream,
     163             :                            void *callback_private_data,
     164             :                            void *per_buffer_data)
     165             : {
     166      658238 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     167             : 
     168      658238 :     if (p->current_blocknum < p->last_exclusive)
     169      529072 :         return p->current_blocknum++;
     170             : 
     171      129166 :     return InvalidBlockNumber;
     172             : }
     173             : 
     174             : /*
     175             :  * Ask the callback which block it would like us to read next, with a one block
     176             :  * buffer in front to allow read_stream_unget_block() to work.
     177             :  */
     178             : static inline BlockNumber
     179    10119786 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     180             : {
     181             :     BlockNumber blocknum;
     182             : 
     183    10119786 :     blocknum = stream->buffered_blocknum;
     184    10119786 :     if (blocknum != InvalidBlockNumber)
     185           8 :         stream->buffered_blocknum = InvalidBlockNumber;
     186             :     else
     187             :     {
     188             :         /*
     189             :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
     190             :          * the "noaccess" state that was set when the consumer moved past this
     191             :          * entry last time around the queue, and should also catch callbacks
     192             :          * that fail to initialize data that the buffer consumer later
     193             :          * accesses.  On the first go around, it is undefined already.
     194             :          */
     195             :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     196             :                                     stream->per_buffer_data_size);
     197    10119778 :         blocknum = stream->callback(stream,
     198             :                                     stream->callback_private_data,
     199             :                                     per_buffer_data);
     200             :     }
     201             : 
     202    10119786 :     return blocknum;
     203             : }
     204             : 
     205             : /*
     206             :  * In order to deal with buffer shortages and I/O limits after short reads, we
     207             :  * sometimes need to defer handling of a block we've already consumed from the
     208             :  * registered callback until later.
     209             :  */
     210             : static inline void
     211           8 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     212             : {
     213             :     /* We shouldn't ever unget more than one block. */
     214             :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     215             :     Assert(blocknum != InvalidBlockNumber);
     216           8 :     stream->buffered_blocknum = blocknum;
     217           8 : }
     218             : 
     219             : /*
     220             :  * Start as much of the current pending read as we can.  If we have to split it
     221             :  * because of the per-backend buffer limit, or the buffer manager decides to
     222             :  * split it, then the pending read is adjusted to hold the remaining portion.
     223             :  *
     224             :  * We can always start a read of at least size one if we have no progress yet.
     225             :  * Otherwise it's possible that we can't start a read at all because of a lack
     226             :  * of buffers, and then false is returned.  Buffer shortages also reduce the
     227             :  * distance to a level that prevents look-ahead until buffers are released.
     228             :  */
     229             : static bool
     230     3636630 : read_stream_start_pending_read(ReadStream *stream)
     231             : {
     232             :     bool        need_wait;
     233             :     int         requested_nblocks;
     234             :     int         nblocks;
     235             :     int         flags;
     236             :     int         forwarded;
     237             :     int16       io_index;
     238             :     int16       overflow;
     239             :     int16       buffer_index;
     240             :     int         buffer_limit;
     241             : 
     242             :     /* This should only be called with a pending read. */
     243             :     Assert(stream->pending_read_nblocks > 0);
     244             :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
     245             : 
     246             :     /* We had better not exceed the per-stream buffer limit with this read. */
     247             :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     248             :            stream->max_pinned_buffers);
     249             : 
     250             :     /* We had better not be overwriting an existing pinned buffer. */
     251     3636630 :     if (stream->pinned_buffers > 0)
     252             :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     253             :     else
     254             :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     255             : 
     256             :     /* Do we need to issue read-ahead advice? */
     257     3636630 :     flags = stream->read_buffers_flags;
     258     3636630 :     if (stream->advice_enabled)
     259             :     {
     260        3284 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
     261             :         {
     262             :             /*
     263             :              * Sequential:  Issue advice until the preadv() calls have caught
     264             :              * up with the first advice issued for this sequential region, and
     265             :              * then stay of the way of the kernel's own read-ahead.
     266             :              */
     267          44 :             if (stream->seq_until_processed != InvalidBlockNumber)
     268           2 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     269             :         }
     270             :         else
     271             :         {
     272             :             /*
     273             :              * Random jump:  Note the starting location of a new potential
     274             :              * sequential region and start issuing advice.  Skip it this time
     275             :              * if the preadv() follows immediately, eg first block in stream.
     276             :              */
     277        3240 :             stream->seq_until_processed = stream->pending_read_blocknum;
     278        3240 :             if (stream->pinned_buffers > 0)
     279          48 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     280             :         }
     281             :     }
     282             : 
     283             :     /*
     284             :      * How many more buffers is this backend allowed?
     285             :      *
     286             :      * Forwarded buffers are already pinned and map to the leading blocks of
     287             :      * the pending read (the remaining portion of an earlier short read that
     288             :      * we're about to continue).  They are not counted in pinned_buffers, but
     289             :      * they are counted as pins already held by this backend according to the
     290             :      * buffer manager, so they must be added to the limit it grants us.
     291             :      */
     292     3636630 :     if (stream->temporary)
     293       23668 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
     294             :     else
     295     3612962 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
     296             :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     297             : 
     298     3636630 :     buffer_limit += stream->forwarded_buffers;
     299     3636630 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
     300             : 
     301     3636630 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
     302     1231940 :         buffer_limit = 1;       /* guarantee progress */
     303             : 
     304             :     /* Does the per-backend limit affect this read? */
     305     3636630 :     nblocks = stream->pending_read_nblocks;
     306     3636630 :     if (buffer_limit < nblocks)
     307             :     {
     308             :         int16       new_distance;
     309             : 
     310             :         /* Shrink distance: no more look-ahead until buffers are released. */
     311        5010 :         new_distance = stream->pinned_buffers + buffer_limit;
     312        5010 :         if (stream->distance > new_distance)
     313        3488 :             stream->distance = new_distance;
     314             : 
     315             :         /* Unless we have nothing to give the consumer, stop here. */
     316        5010 :         if (stream->pinned_buffers > 0)
     317        2482 :             return false;
     318             : 
     319             :         /* A short read is required to make progress. */
     320        2528 :         nblocks = buffer_limit;
     321             :     }
     322             : 
     323             :     /*
     324             :      * We say how many blocks we want to read, but it may be smaller on return
     325             :      * if the buffer manager decides to shorten the read.  Initialize buffers
     326             :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
     327             :      * and keep the original nblocks number so we can check for forwarded
     328             :      * buffers as output, below.
     329             :      */
     330     3634148 :     buffer_index = stream->next_buffer_index;
     331     3634148 :     io_index = stream->next_io_index;
     332     5720478 :     while (stream->initialized_buffers < buffer_index + nblocks)
     333     2086330 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
     334     3634148 :     requested_nblocks = nblocks;
     335     3634148 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     336     3634148 :                                  &stream->buffers[buffer_index],
     337             :                                  stream->pending_read_blocknum,
     338             :                                  &nblocks,
     339             :                                  flags);
     340     3634136 :     stream->pinned_buffers += nblocks;
     341             : 
     342             :     /* Remember whether we need to wait before returning this buffer. */
     343     3634136 :     if (!need_wait)
     344             :     {
     345             :         /* Look-ahead distance decays, no I/O necessary. */
     346     2497462 :         if (stream->distance > 1)
     347       31750 :             stream->distance--;
     348             :     }
     349             :     else
     350             :     {
     351             :         /*
     352             :          * Remember to call WaitReadBuffers() before returning head buffer.
     353             :          * Look-ahead distance will be adjusted after waiting.
     354             :          */
     355     1136674 :         stream->ios[io_index].buffer_index = buffer_index;
     356     1136674 :         if (++stream->next_io_index == stream->max_ios)
     357       48104 :             stream->next_io_index = 0;
     358             :         Assert(stream->ios_in_progress < stream->max_ios);
     359     1136674 :         stream->ios_in_progress++;
     360     1136674 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     361             :     }
     362             : 
     363             :     /*
     364             :      * How many pins were acquired but forwarded to the next call?  These need
     365             :      * to be passed to the next StartReadBuffers() call by leaving them
     366             :      * exactly where they are in the queue, or released if the stream ends
     367             :      * early.  We need the number for accounting purposes, since they are not
     368             :      * counted in stream->pinned_buffers but we already hold them.
     369             :      */
     370     3634136 :     forwarded = 0;
     371     3636876 :     while (nblocks + forwarded < requested_nblocks &&
     372      101278 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
     373        2740 :         forwarded++;
     374     3634136 :     stream->forwarded_buffers = forwarded;
     375             : 
     376             :     /*
     377             :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     378             :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
     379             :      * the front of the array where they'll be consumed, but also leave a copy
     380             :      * in the overflow zone which the I/O operation has a pointer to (it needs
     381             :      * a contiguous array).  Both copies will be cleared when the buffers are
     382             :      * handed to the consumer.
     383             :      */
     384     3634136 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
     385     3634136 :     if (overflow > 0)
     386             :     {
     387             :         Assert(overflow < stream->queue_size);    /* can't overlap */
     388         660 :         memcpy(&stream->buffers[0],
     389         660 :                &stream->buffers[stream->queue_size],
     390             :                sizeof(stream->buffers[0]) * overflow);
     391             :     }
     392             : 
     393             :     /* Compute location of start of next read, without using % operator. */
     394     3634136 :     buffer_index += nblocks;
     395     3634136 :     if (buffer_index >= stream->queue_size)
     396      580220 :         buffer_index -= stream->queue_size;
     397             :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     398     3634136 :     stream->next_buffer_index = buffer_index;
     399             : 
     400             :     /* Adjust the pending read to cover the remaining portion, if any. */
     401     3634136 :     stream->pending_read_blocknum += nblocks;
     402     3634136 :     stream->pending_read_nblocks -= nblocks;
     403             : 
     404     3634136 :     return true;
     405             : }
     406             : 
     407             : static void
     408     6339808 : read_stream_look_ahead(ReadStream *stream)
     409             : {
     410             :     /*
     411             :      * Allow amortizing the cost of submitting IO over multiple IOs. This
     412             :      * requires that we don't do any operations that could lead to a deadlock
     413             :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
     414             :      * careful.
     415             :      */
     416     6339808 :     if (stream->batch_mode)
     417     5413944 :         pgaio_enter_batchmode();
     418             : 
     419    10307064 :     while (stream->ios_in_progress < stream->max_ios &&
     420    10307064 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     421             :     {
     422             :         BlockNumber blocknum;
     423             :         int16       buffer_index;
     424             :         void       *per_buffer_data;
     425             : 
     426     6159376 :         if (stream->pending_read_nblocks == stream->io_combine_limit)
     427             :         {
     428        7242 :             read_stream_start_pending_read(stream);
     429        7242 :             continue;
     430             :         }
     431             : 
     432             :         /*
     433             :          * See which block the callback wants next in the stream.  We need to
     434             :          * compute the index of the Nth block of the pending read including
     435             :          * wrap-around, but we don't want to use the expensive % operator.
     436             :          */
     437     6152134 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     438     6152134 :         if (buffer_index >= stream->queue_size)
     439        3594 :             buffer_index -= stream->queue_size;
     440             :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     441     6152134 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     442     6152134 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     443     6152134 :         if (blocknum == InvalidBlockNumber)
     444             :         {
     445             :             /* End of stream. */
     446     2192112 :             stream->distance = 0;
     447     2192112 :             break;
     448             :         }
     449             : 
     450             :         /* Can we merge it with the pending read? */
     451     3960022 :         if (stream->pending_read_nblocks > 0 &&
     452      427844 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     453             :         {
     454      427750 :             stream->pending_read_nblocks++;
     455      427750 :             continue;
     456             :         }
     457             : 
     458             :         /* We have to start the pending read before we can build another. */
     459     3532368 :         while (stream->pending_read_nblocks > 0)
     460             :         {
     461         104 :             if (!read_stream_start_pending_read(stream) ||
     462          96 :                 stream->ios_in_progress == stream->max_ios)
     463             :             {
     464             :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
     465           8 :                 read_stream_unget_block(stream, blocknum);
     466           8 :                 if (stream->batch_mode)
     467           8 :                     pgaio_exit_batchmode();
     468           8 :                 return;
     469             :             }
     470             :         }
     471             : 
     472             :         /* This is the start of a new pending read. */
     473     3532264 :         stream->pending_read_blocknum = blocknum;
     474     3532264 :         stream->pending_read_nblocks = 1;
     475             :     }
     476             : 
     477             :     /*
     478             :      * We don't start the pending read just because we've hit the distance
     479             :      * limit, preferring to give it another chance to grow to full
     480             :      * io_combine_limit size once more buffers have been consumed.  However,
     481             :      * if we've already reached io_combine_limit, or we've reached the
     482             :      * distance limit and there isn't anything pinned yet, or the callback has
     483             :      * signaled end-of-stream, we start the read immediately.  Note that the
     484             :      * pending read can exceed the distance goal, if the latter was reduced
     485             :      * after hitting the per-backend buffer limit.
     486             :      */
     487     6339800 :     if (stream->pending_read_nblocks > 0 &&
     488     3707778 :         (stream->pending_read_nblocks == stream->io_combine_limit ||
     489     3696258 :          (stream->pending_read_nblocks >= stream->distance &&
     490     3617764 :           stream->pinned_buffers == 0) ||
     491       88706 :          stream->distance == 0) &&
     492     3629284 :         stream->ios_in_progress < stream->max_ios)
     493     3629284 :         read_stream_start_pending_read(stream);
     494             : 
     495             :     /*
     496             :      * There should always be something pinned when we leave this function,
     497             :      * whether started by this call or not, unless we've hit the end of the
     498             :      * stream.  In the worst case we can always make progress one buffer at a
     499             :      * time.
     500             :      */
     501             :     Assert(stream->pinned_buffers > 0 || stream->distance == 0);
     502             : 
     503     6339788 :     if (stream->batch_mode)
     504     5413924 :         pgaio_exit_batchmode();
     505             : }
     506             : 
     507             : /*
     508             :  * Create a new read stream object that can be used to perform the equivalent
     509             :  * of a series of ReadBuffer() calls for one fork of one relation.
     510             :  * Internally, it generates larger vectored reads where possible by looking
     511             :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     512             :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     513             :  * write extra data for each block into the space provided to it.  It will
     514             :  * also receive callback_private_data for its own purposes.
     515             :  */
     516             : static ReadStream *
     517     1013946 : read_stream_begin_impl(int flags,
     518             :                        BufferAccessStrategy strategy,
     519             :                        Relation rel,
     520             :                        SMgrRelation smgr,
     521             :                        char persistence,
     522             :                        ForkNumber forknum,
     523             :                        ReadStreamBlockNumberCB callback,
     524             :                        void *callback_private_data,
     525             :                        size_t per_buffer_data_size)
     526             : {
     527             :     ReadStream *stream;
     528             :     size_t      size;
     529             :     int16       queue_size;
     530             :     int16       queue_overflow;
     531             :     int         max_ios;
     532             :     int         strategy_pin_limit;
     533             :     uint32      max_pinned_buffers;
     534             :     uint32      max_possible_buffer_limit;
     535             :     Oid         tablespace_id;
     536             : 
     537             :     /*
     538             :      * Decide how many I/Os we will allow to run at the same time.  That
     539             :      * currently means advice to the kernel to tell it that we will soon read.
     540             :      * This number also affects how far we look ahead for opportunities to
     541             :      * start more I/Os.
     542             :      */
     543     1013946 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     544     1013946 :     if (!OidIsValid(MyDatabaseId) ||
     545     1175208 :         (rel && IsCatalogRelation(rel)) ||
     546      297894 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     547             :     {
     548             :         /*
     549             :          * Avoid circularity while trying to look up tablespace settings or
     550             :          * before spccache.c is ready.
     551             :          */
     552      827586 :         max_ios = effective_io_concurrency;
     553             :     }
     554      186360 :     else if (flags & READ_STREAM_MAINTENANCE)
     555       21016 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     556             :     else
     557      165344 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     558             : 
     559             :     /* Cap to INT16_MAX to avoid overflowing below */
     560     1013946 :     max_ios = Min(max_ios, PG_INT16_MAX);
     561             : 
     562             :     /*
     563             :      * If starting a multi-block I/O near the end of the queue, we might
     564             :      * temporarily need extra space for overflowing buffers before they are
     565             :      * moved to regular circular position.  This is the maximum extra space we
     566             :      * could need.
     567             :      */
     568     1013946 :     queue_overflow = io_combine_limit - 1;
     569             : 
     570             :     /*
     571             :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     572             :      * pin fewer if we can, though.  We add one so that we can make progress
     573             :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
     574             :      * this also allows an extra full I/O's worth of buffers: after an I/O
     575             :      * finishes we don't want to have to wait for its buffers to be consumed
     576             :      * before starting a new one.
     577             :      *
     578             :      * Be careful not to allow int16 to overflow.  That is possible with the
     579             :      * current GUC range limits, so this is an artificial limit of ~32k
     580             :      * buffers and we'd need to adjust the types to exceed that.  We also have
     581             :      * to allow for the spare entry and the overflow space.
     582             :      */
     583     1013946 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
     584     1013946 :     max_pinned_buffers = Min(max_pinned_buffers,
     585             :                              PG_INT16_MAX - queue_overflow - 1);
     586             : 
     587             :     /* Give the strategy a chance to limit the number of buffers we pin. */
     588     1013946 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     589     1013946 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     590             : 
     591             :     /*
     592             :      * Also limit our queue to the maximum number of pins we could ever be
     593             :      * allowed to acquire according to the buffer manager.  We may not really
     594             :      * be able to use them all due to other pins held by this backend, but
     595             :      * we'll check that later in read_stream_start_pending_read().
     596             :      */
     597     1013946 :     if (SmgrIsTemp(smgr))
     598       13454 :         max_possible_buffer_limit = GetLocalPinLimit();
     599             :     else
     600     1000492 :         max_possible_buffer_limit = GetPinLimit();
     601     1013946 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
     602             : 
     603             :     /*
     604             :      * The limit might be zero on a system configured with too few buffers for
     605             :      * the number of connections.  We need at least one to make progress.
     606             :      */
     607     1013946 :     max_pinned_buffers = Max(1, max_pinned_buffers);
     608             : 
     609             :     /*
     610             :      * We need one extra entry for buffers and per-buffer data, because users
     611             :      * of per-buffer data have access to the object until the next call to
     612             :      * read_stream_next_buffer(), so we need a gap between the head and tail
     613             :      * of the queue so that we don't clobber it.
     614             :      */
     615     1013946 :     queue_size = max_pinned_buffers + 1;
     616             : 
     617             :     /*
     618             :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     619             :      * one big chunk.  Though we have queue_size buffers, we want to be able
     620             :      * to assume that all the buffers for a single read are contiguous (i.e.
     621             :      * don't wrap around halfway through), so we allow temporary overflows of
     622             :      * up to the maximum possible overflow size.
     623             :      */
     624     1013946 :     size = offsetof(ReadStream, buffers);
     625     1013946 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
     626     1013946 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     627     1013946 :     size += per_buffer_data_size * queue_size;
     628     1013946 :     size += MAXIMUM_ALIGNOF * 2;
     629     1013946 :     stream = (ReadStream *) palloc(size);
     630     1013946 :     memset(stream, 0, offsetof(ReadStream, buffers));
     631     1013946 :     stream->ios = (InProgressIO *)
     632     1013946 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
     633     1013946 :     if (per_buffer_data_size > 0)
     634      146044 :         stream->per_buffer_data = (void *)
     635      146044 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     636             : 
     637     1013946 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
     638     1013946 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
     639             : 
     640             : #ifdef USE_PREFETCH
     641             : 
     642             :     /*
     643             :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
     644             :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
     645             :      * caller hasn't promised sequential access (overriding our detection
     646             :      * heuristics), and max_ios hasn't been set to zero.
     647             :      */
     648     1013946 :     if (stream->sync_mode &&
     649        5796 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
     650        5796 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     651             :         max_ios > 0)
     652        1386 :         stream->advice_enabled = true;
     653             : #endif
     654             : 
     655             :     /*
     656             :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
     657             :      * we still need to allocate space to combine and run one I/O.  Bump it up
     658             :      * to one, and remember to ask for synchronous I/O only.
     659             :      */
     660     1013946 :     if (max_ios == 0)
     661             :     {
     662           0 :         max_ios = 1;
     663           0 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
     664             :     }
     665             : 
     666             :     /*
     667             :      * Capture stable values for these two GUC-derived numbers for the
     668             :      * lifetime of this stream, so we don't have to worry about the GUCs
     669             :      * changing underneath us beyond this point.
     670             :      */
     671     1013946 :     stream->max_ios = max_ios;
     672     1013946 :     stream->io_combine_limit = io_combine_limit;
     673             : 
     674     1013946 :     stream->per_buffer_data_size = per_buffer_data_size;
     675     1013946 :     stream->max_pinned_buffers = max_pinned_buffers;
     676     1013946 :     stream->queue_size = queue_size;
     677     1013946 :     stream->callback = callback;
     678     1013946 :     stream->callback_private_data = callback_private_data;
     679     1013946 :     stream->buffered_blocknum = InvalidBlockNumber;
     680     1013946 :     stream->seq_blocknum = InvalidBlockNumber;
     681     1013946 :     stream->seq_until_processed = InvalidBlockNumber;
     682     1013946 :     stream->temporary = SmgrIsTemp(smgr);
     683             : 
     684             :     /*
     685             :      * Skip the initial ramp-up phase if the caller says we're going to be
     686             :      * reading the whole relation.  This way we start out assuming we'll be
     687             :      * doing full io_combine_limit sized reads.
     688             :      */
     689     1013946 :     if (flags & READ_STREAM_FULL)
     690      130798 :         stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
     691             :     else
     692      883148 :         stream->distance = 1;
     693             : 
     694             :     /*
     695             :      * Since we always access the same relation, we can initialize parts of
     696             :      * the ReadBuffersOperation objects and leave them that way, to avoid
     697             :      * wasting CPU cycles writing to them for each read.
     698             :      */
     699    17280126 :     for (int i = 0; i < max_ios; ++i)
     700             :     {
     701    16266180 :         stream->ios[i].op.rel = rel;
     702    16266180 :         stream->ios[i].op.smgr = smgr;
     703    16266180 :         stream->ios[i].op.persistence = persistence;
     704    16266180 :         stream->ios[i].op.forknum = forknum;
     705    16266180 :         stream->ios[i].op.strategy = strategy;
     706             :     }
     707             : 
     708     1013946 :     return stream;
     709             : }
     710             : 
     711             : /*
     712             :  * Create a new read stream for reading a relation.
     713             :  * See read_stream_begin_impl() for the detailed explanation.
     714             :  */
     715             : ReadStream *
     716      894340 : read_stream_begin_relation(int flags,
     717             :                            BufferAccessStrategy strategy,
     718             :                            Relation rel,
     719             :                            ForkNumber forknum,
     720             :                            ReadStreamBlockNumberCB callback,
     721             :                            void *callback_private_data,
     722             :                            size_t per_buffer_data_size)
     723             : {
     724      894340 :     return read_stream_begin_impl(flags,
     725             :                                   strategy,
     726             :                                   rel,
     727             :                                   RelationGetSmgr(rel),
     728      894340 :                                   rel->rd_rel->relpersistence,
     729             :                                   forknum,
     730             :                                   callback,
     731             :                                   callback_private_data,
     732             :                                   per_buffer_data_size);
     733             : }
     734             : 
     735             : /*
     736             :  * Create a new read stream for reading a SMgr relation.
     737             :  * See read_stream_begin_impl() for the detailed explanation.
     738             :  */
     739             : ReadStream *
     740      119606 : read_stream_begin_smgr_relation(int flags,
     741             :                                 BufferAccessStrategy strategy,
     742             :                                 SMgrRelation smgr,
     743             :                                 char smgr_persistence,
     744             :                                 ForkNumber forknum,
     745             :                                 ReadStreamBlockNumberCB callback,
     746             :                                 void *callback_private_data,
     747             :                                 size_t per_buffer_data_size)
     748             : {
     749      119606 :     return read_stream_begin_impl(flags,
     750             :                                   strategy,
     751             :                                   NULL,
     752             :                                   smgr,
     753             :                                   smgr_persistence,
     754             :                                   forknum,
     755             :                                   callback,
     756             :                                   callback_private_data,
     757             :                                   per_buffer_data_size);
     758             : }
     759             : 
     760             : /*
     761             :  * Pull one pinned buffer out of a stream.  Each call returns successive
     762             :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     763             :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     764             :  * per-buffer data that the callback had a chance to populate, which remains
     765             :  * valid until the next call to read_stream_next_buffer().  When the stream
     766             :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     767             :  * the stream early at any time by calling read_stream_end().
     768             :  */
     769             : Buffer
     770    12418326 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     771             : {
     772             :     Buffer      buffer;
     773             :     int16       oldest_buffer_index;
     774             : 
     775             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     776             : 
     777             :     /*
     778             :      * A fast path for all-cached scans.  This is the same as the usual
     779             :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
     780             :      * we can skip the queue management code, stay in the same buffer slot and
     781             :      * use singular StartReadBuffer().
     782             :      */
     783    12418326 :     if (likely(stream->fast_path))
     784             :     {
     785             :         BlockNumber next_blocknum;
     786             : 
     787             :         /* Fast path assumptions. */
     788             :         Assert(stream->ios_in_progress == 0);
     789             :         Assert(stream->forwarded_buffers == 0);
     790             :         Assert(stream->pinned_buffers == 1);
     791             :         Assert(stream->distance == 1);
     792             :         Assert(stream->pending_read_nblocks == 0);
     793             :         Assert(stream->per_buffer_data_size == 0);
     794             :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
     795             : 
     796             :         /* We're going to return the buffer we pinned last time. */
     797     3967652 :         oldest_buffer_index = stream->oldest_buffer_index;
     798             :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     799             :                stream->next_buffer_index);
     800     3967652 :         buffer = stream->buffers[oldest_buffer_index];
     801             :         Assert(buffer != InvalidBuffer);
     802             : 
     803             :         /* Choose the next block to pin. */
     804     3967652 :         next_blocknum = read_stream_get_block(stream, NULL);
     805             : 
     806     3967652 :         if (likely(next_blocknum != InvalidBlockNumber))
     807             :         {
     808     3797098 :             int         flags = stream->read_buffers_flags;
     809             : 
     810     3797098 :             if (stream->advice_enabled)
     811        1058 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     812             : 
     813             :             /*
     814             :              * Pin a buffer for the next call.  Same buffer entry, and
     815             :              * arbitrary I/O entry (they're all free).  We don't have to
     816             :              * adjust pinned_buffers because we're transferring one to caller
     817             :              * but pinning one more.
     818             :              *
     819             :              * In the fast path we don't need to check the pin limit.  We're
     820             :              * always allowed at least one pin so that progress can be made,
     821             :              * and that's all we need here.  Although two pins are momentarily
     822             :              * held at the same time, the model used here is that the stream
     823             :              * holds only one, and the other now belongs to the caller.
     824             :              */
     825     3797098 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
     826             :                                         &stream->buffers[oldest_buffer_index],
     827             :                                         next_blocknum,
     828             :                                         flags)))
     829             :             {
     830             :                 /* Fast return. */
     831     3772988 :                 return buffer;
     832             :             }
     833             : 
     834             :             /* Next call must wait for I/O for the newly pinned buffer. */
     835       24110 :             stream->oldest_io_index = 0;
     836       24110 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     837       24110 :             stream->ios_in_progress = 1;
     838       24110 :             stream->ios[0].buffer_index = oldest_buffer_index;
     839       24110 :             stream->seq_blocknum = next_blocknum + 1;
     840             :         }
     841             :         else
     842             :         {
     843             :             /* No more blocks, end of stream. */
     844      170554 :             stream->distance = 0;
     845      170554 :             stream->oldest_buffer_index = stream->next_buffer_index;
     846      170554 :             stream->pinned_buffers = 0;
     847      170554 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
     848             :         }
     849             : 
     850      194664 :         stream->fast_path = false;
     851      194664 :         return buffer;
     852             :     }
     853             : #endif
     854             : 
     855     8450674 :     if (unlikely(stream->pinned_buffers == 0))
     856             :     {
     857             :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     858             : 
     859             :         /* End of stream reached?  */
     860     5874150 :         if (stream->distance == 0)
     861     3321952 :             return InvalidBuffer;
     862             : 
     863             :         /*
     864             :          * The usual order of operations is that we look ahead at the bottom
     865             :          * of this function after potentially finishing an I/O and making
     866             :          * space for more, but if we're just starting up we'll need to crank
     867             :          * the handle to get started.
     868             :          */
     869     2552198 :         read_stream_look_ahead(stream);
     870             : 
     871             :         /* End of stream reached? */
     872     2552198 :         if (stream->pinned_buffers == 0)
     873             :         {
     874             :             Assert(stream->distance == 0);
     875     1341070 :             return InvalidBuffer;
     876             :         }
     877             :     }
     878             : 
     879             :     /* Grab the oldest pinned buffer and associated per-buffer data. */
     880             :     Assert(stream->pinned_buffers > 0);
     881     3787652 :     oldest_buffer_index = stream->oldest_buffer_index;
     882             :     Assert(oldest_buffer_index >= 0 &&
     883             :            oldest_buffer_index < stream->queue_size);
     884     3787652 :     buffer = stream->buffers[oldest_buffer_index];
     885     3787652 :     if (per_buffer_data)
     886     1227534 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     887             : 
     888             :     Assert(BufferIsValid(buffer));
     889             : 
     890             :     /* Do we have to wait for an associated I/O first? */
     891     3787652 :     if (stream->ios_in_progress > 0 &&
     892     1328764 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     893             :     {
     894     1160192 :         int16       io_index = stream->oldest_io_index;
     895             :         int32       distance;   /* wider temporary value, clamped below */
     896             : 
     897             :         /* Sanity check that we still agree on the buffers. */
     898             :         Assert(stream->ios[io_index].op.buffers ==
     899             :                &stream->buffers[oldest_buffer_index]);
     900             : 
     901     1160192 :         WaitReadBuffers(&stream->ios[io_index].op);
     902             : 
     903             :         Assert(stream->ios_in_progress > 0);
     904     1160150 :         stream->ios_in_progress--;
     905     1160150 :         if (++stream->oldest_io_index == stream->max_ios)
     906       48104 :             stream->oldest_io_index = 0;
     907             : 
     908             :         /* Look-ahead distance ramps up rapidly after we do I/O. */
     909     1160150 :         distance = stream->distance * 2;
     910     1160150 :         distance = Min(distance, stream->max_pinned_buffers);
     911     1160150 :         stream->distance = distance;
     912             : 
     913             :         /*
     914             :          * If we've reached the first block of a sequential region we're
     915             :          * issuing advice for, cancel that until the next jump.  The kernel
     916             :          * will see the sequential preadv() pattern starting here.
     917             :          */
     918     1160150 :         if (stream->advice_enabled &&
     919         544 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
     920         502 :             stream->seq_until_processed = InvalidBlockNumber;
     921             :     }
     922             : 
     923             :     /*
     924             :      * We must zap this queue entry, or else it would appear as a forwarded
     925             :      * buffer.  If it's potentially in the overflow zone (ie from a
     926             :      * multi-block I/O that wrapped around the queue), also zap the copy.
     927             :      */
     928     3787610 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
     929     3787610 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
     930     2673400 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
     931             :             InvalidBuffer;
     932             : 
     933             : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
     934             : 
     935             :     /*
     936             :      * The caller will get access to the per-buffer data, until the next call.
     937             :      * We wipe the one before, which is never occupied because queue_size
     938             :      * allowed one extra element.  This will hopefully trip up client code
     939             :      * that is holding a dangling pointer to it.
     940             :      */
     941             :     if (stream->per_buffer_data)
     942             :     {
     943             :         void       *per_buffer_data;
     944             : 
     945             :         per_buffer_data = get_per_buffer_data(stream,
     946             :                                               oldest_buffer_index == 0 ?
     947             :                                               stream->queue_size - 1 :
     948             :                                               oldest_buffer_index - 1);
     949             : 
     950             : #if defined(CLOBBER_FREED_MEMORY)
     951             :         /* This also tells Valgrind the memory is "noaccess". */
     952             :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
     953             : #elif defined(USE_VALGRIND)
     954             :         /* Tell it ourselves. */
     955             :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
     956             :                                    stream->per_buffer_data_size);
     957             : #endif
     958             :     }
     959             : #endif
     960             : 
     961             :     /* Pin transferred to caller. */
     962             :     Assert(stream->pinned_buffers > 0);
     963     3787610 :     stream->pinned_buffers--;
     964             : 
     965             :     /* Advance oldest buffer, with wrap-around. */
     966     3787610 :     stream->oldest_buffer_index++;
     967     3787610 :     if (stream->oldest_buffer_index == stream->queue_size)
     968      566456 :         stream->oldest_buffer_index = 0;
     969             : 
     970             :     /* Prepare for the next call. */
     971     3787610 :     read_stream_look_ahead(stream);
     972             : 
     973             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     974             :     /* See if we can take the fast path for all-cached scans next time. */
     975     3787598 :     if (stream->ios_in_progress == 0 &&
     976     2620704 :         stream->forwarded_buffers == 0 &&
     977     2619694 :         stream->pinned_buffers == 1 &&
     978     1468862 :         stream->distance == 1 &&
     979     1332206 :         stream->pending_read_nblocks == 0 &&
     980     1329450 :         stream->per_buffer_data_size == 0)
     981             :     {
     982      373904 :         stream->fast_path = true;
     983             :     }
     984             : #endif
     985             : 
     986     3787598 :     return buffer;
     987             : }
     988             : 
     989             : /*
     990             :  * Transitional support for code that would like to perform or skip reads
     991             :  * itself, without using the stream.  Returns, and consumes, the next block
     992             :  * number that would be read by the stream's look-ahead algorithm, or
     993             :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
     994             :  * strategy that would be used to read it.
     995             :  */
     996             : BlockNumber
     997           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
     998             : {
     999           0 :     *strategy = stream->ios[0].op.strategy;
    1000           0 :     return read_stream_get_block(stream, NULL);
    1001             : }
    1002             : 
    1003             : /*
    1004             :  * Reset a read stream by releasing any queued up buffers, allowing the stream
    1005             :  * to be used again for different blocks.  This can be used to clear an
    1006             :  * end-of-stream condition and start again, or to throw away blocks that were
    1007             :  * speculatively read and read some different blocks instead.
    1008             :  */
    1009             : void
    1010     2553584 : read_stream_reset(ReadStream *stream)
    1011             : {
    1012             :     int16       index;
    1013             :     Buffer      buffer;
    1014             : 
    1015             :     /* Stop looking ahead. */
    1016     2553584 :     stream->distance = 0;
    1017             : 
    1018             :     /* Forget buffered block number and fast path state. */
    1019     2553584 :     stream->buffered_blocknum = InvalidBlockNumber;
    1020     2553584 :     stream->fast_path = false;
    1021             : 
    1022             :     /* Unpin anything that wasn't consumed. */
    1023     2785266 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    1024      231682 :         ReleaseBuffer(buffer);
    1025             : 
    1026             :     /* Unpin any unused forwarded buffers. */
    1027     2553584 :     index = stream->next_buffer_index;
    1028     2553584 :     while (index < stream->initialized_buffers &&
    1029      456268 :            (buffer = stream->buffers[index]) != InvalidBuffer)
    1030             :     {
    1031             :         Assert(stream->forwarded_buffers > 0);
    1032           0 :         stream->forwarded_buffers--;
    1033           0 :         ReleaseBuffer(buffer);
    1034             : 
    1035           0 :         stream->buffers[index] = InvalidBuffer;
    1036           0 :         if (index < stream->io_combine_limit - 1)
    1037           0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
    1038             : 
    1039           0 :         if (++index == stream->queue_size)
    1040           0 :             index = 0;
    1041             :     }
    1042             : 
    1043             :     Assert(stream->forwarded_buffers == 0);
    1044             :     Assert(stream->pinned_buffers == 0);
    1045             :     Assert(stream->ios_in_progress == 0);
    1046             : 
    1047             :     /* Start off assuming data is cached. */
    1048     2553584 :     stream->distance = 1;
    1049     2553584 : }
    1050             : 
    1051             : /*
    1052             :  * Release and free a read stream.
    1053             :  */
    1054             : void
    1055     1009416 : read_stream_end(ReadStream *stream)
    1056             : {
    1057     1009416 :     read_stream_reset(stream);
    1058     1009416 :     pfree(stream);
    1059     1009416 : }

Generated by: LCOV version 1.14