LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 240 265 90.6 %
Date: 2025-04-01 15:15:16 Functions: 12 13 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * read_stream.c
       4             :  *    Mechanism for accessing buffered relation data with look-ahead
       5             :  *
       6             :  * Code that needs to access relation data typically pins blocks one at a
       7             :  * time, often in a predictable order that might be sequential or data-driven.
       8             :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9             :  * because blocks that are not yet in the buffer pool require I/O operations
      10             :  * that are small and might stall waiting for storage.  This mechanism looks
      11             :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12             :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13             :  * distance.
      14             :  *
      15             :  * A user-provided callback generates a stream of block numbers that is used
      16             :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17             :  * pending read.  When that isn't possible, the existing pending read is sent
      18             :  * to StartReadBuffers() so that a new one can begin to form.
      19             :  *
      20             :  * The algorithm for controlling the look-ahead distance is based on recent
      21             :  * cache hit and miss history.  When no I/O is necessary, there is no benefit
      22             :  * in looking ahead more than one block.  This is the default initial
      23             :  * assumption, but when blocks needing I/O are streamed, the distance is
      24             :  * increased rapidly to try to benefit from I/O combining and concurrency.  It
      25             :  * is reduced gradually when cached blocks are streamed.
      26             :  *
      27             :  * The main data structure is a circular queue of buffers of size
      28             :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      29             :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      30             :  * variable sized object that is passed from the callback to the consumer of
      31             :  * buffers.
      32             :  *
      33             :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      34             :  * I/Os that have been started with StartReadBuffers(), and for which
      35             :  * WaitReadBuffers() must be called before returning the buffer.
      36             :  *
      37             :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      38             :  * successive calls, then these data structures might appear as follows:
      39             :  *
      40             :  *                          buffers buf/data       ios
      41             :  *
      42             :  *                          +----+  +-----+       +--------+
      43             :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      44             :  *                          +----+  +-----+  |    +--------+
      45             :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      46             :  *                          +----+  +-----+  | |  +--------+
      47             :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      48             :  *                          +----+  +-----+    |  +--------+
      49             :  *                          | 43 |  |  ?  |    |  |        |
      50             :  *                          +----+  +-----+    |  +--------+
      51             :  *                          | 44 |  |  ?  |    |  |        |
      52             :  *                          +----+  +-----+    |  +--------+
      53             :  *                          | 60 |  |  ?  |<---+
      54             :  *                          +----+  +-----+
      55             :  *     next_buffer_index -> |    |  |     |
      56             :  *                          +----+  +-----+
      57             :  *
      58             :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      59             :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      60             :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      61             :  * does block 60.
      62             :  *
      63             :  *
      64             :  * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
      65             :  * Portions Copyright (c) 1994, Regents of the University of California
      66             :  *
      67             :  * IDENTIFICATION
      68             :  *    src/backend/storage/aio/read_stream.c
      69             :  *
      70             :  *-------------------------------------------------------------------------
      71             :  */
      72             : #include "postgres.h"
      73             : 
      74             : #include "miscadmin.h"
      75             : #include "storage/aio.h"
      76             : #include "storage/fd.h"
      77             : #include "storage/smgr.h"
      78             : #include "storage/read_stream.h"
      79             : #include "utils/memdebug.h"
      80             : #include "utils/rel.h"
      81             : #include "utils/spccache.h"
      82             : 
      83             : typedef struct InProgressIO
      84             : {
      85             :     int16       buffer_index;
      86             :     ReadBuffersOperation op;
      87             : } InProgressIO;
      88             : 
      89             : /*
      90             :  * State for managing a stream of reads.
      91             :  */
      92             : struct ReadStream
      93             : {
      94             :     int16       max_ios;
      95             :     int16       io_combine_limit;
      96             :     int16       ios_in_progress;
      97             :     int16       queue_size;
      98             :     int16       max_pinned_buffers;
      99             :     int16       forwarded_buffers;
     100             :     int16       pinned_buffers;
     101             :     int16       distance;
     102             :     int16       initialized_buffers;
     103             :     int         read_buffers_flags;
     104             :     bool        sync_mode;      /* using io_method=sync */
     105             :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
     106             :     bool        advice_enabled;
     107             :     bool        temporary;
     108             : 
     109             :     /*
     110             :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     111             :      * control problems when I/Os are split.
     112             :      */
     113             :     BlockNumber buffered_blocknum;
     114             : 
     115             :     /*
     116             :      * The callback that will tell us which block numbers to read, and an
     117             :      * opaque pointer that will be pass to it for its own purposes.
     118             :      */
     119             :     ReadStreamBlockNumberCB callback;
     120             :     void       *callback_private_data;
     121             : 
     122             :     /* Next expected block, for detecting sequential access. */
     123             :     BlockNumber seq_blocknum;
     124             :     BlockNumber seq_until_processed;
     125             : 
     126             :     /* The read operation we are currently preparing. */
     127             :     BlockNumber pending_read_blocknum;
     128             :     int16       pending_read_nblocks;
     129             : 
     130             :     /* Space for buffers and optional per-buffer private data. */
     131             :     size_t      per_buffer_data_size;
     132             :     void       *per_buffer_data;
     133             : 
     134             :     /* Read operations that have been started but not waited for yet. */
     135             :     InProgressIO *ios;
     136             :     int16       oldest_io_index;
     137             :     int16       next_io_index;
     138             : 
     139             :     bool        fast_path;
     140             : 
     141             :     /* Circular queue of buffers. */
     142             :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     143             :     int16       next_buffer_index;  /* Index of next buffer to pin */
     144             :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     145             : };
     146             : 
     147             : /*
     148             :  * Return a pointer to the per-buffer data by index.
     149             :  */
     150             : static inline void *
     151     6746210 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     152             : {
     153    13492420 :     return (char *) stream->per_buffer_data +
     154     6746210 :         stream->per_buffer_data_size * buffer_index;
     155             : }
     156             : 
     157             : /*
     158             :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     159             :  * blocks [current_blocknum, last_exclusive).
     160             :  */
     161             : BlockNumber
     162      626106 : block_range_read_stream_cb(ReadStream *stream,
     163             :                            void *callback_private_data,
     164             :                            void *per_buffer_data)
     165             : {
     166      626106 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     167             : 
     168      626106 :     if (p->current_blocknum < p->last_exclusive)
     169      502816 :         return p->current_blocknum++;
     170             : 
     171      123290 :     return InvalidBlockNumber;
     172             : }
     173             : 
     174             : /*
     175             :  * Ask the callback which block it would like us to read next, with a one block
     176             :  * buffer in front to allow read_stream_unget_block() to work.
     177             :  */
     178             : static inline BlockNumber
     179     9327576 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     180             : {
     181             :     BlockNumber blocknum;
     182             : 
     183     9327576 :     blocknum = stream->buffered_blocknum;
     184     9327576 :     if (blocknum != InvalidBlockNumber)
     185           8 :         stream->buffered_blocknum = InvalidBlockNumber;
     186             :     else
     187             :     {
     188             :         /*
     189             :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
     190             :          * the "noaccess" state that was set when the consumer moved past this
     191             :          * entry last time around the queue, and should also catch callbacks
     192             :          * that fail to initialize data that the buffer consumer later
     193             :          * accesses.  On the first go around, it is undefined already.
     194             :          */
     195             :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     196             :                                     stream->per_buffer_data_size);
     197     9327568 :         blocknum = stream->callback(stream,
     198             :                                     stream->callback_private_data,
     199             :                                     per_buffer_data);
     200             :     }
     201             : 
     202     9327576 :     return blocknum;
     203             : }
     204             : 
     205             : /*
     206             :  * In order to deal with buffer shortages and I/O limits after short reads, we
     207             :  * sometimes need to defer handling of a block we've already consumed from the
     208             :  * registered callback until later.
     209             :  */
     210             : static inline void
     211           8 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     212             : {
     213             :     /* We shouldn't ever unget more than one block. */
     214             :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     215             :     Assert(blocknum != InvalidBlockNumber);
     216           8 :     stream->buffered_blocknum = blocknum;
     217           8 : }
     218             : 
     219             : /*
     220             :  * Start as much of the current pending read as we can.  If we have to split it
     221             :  * because of the per-backend buffer limit, or the buffer manager decides to
     222             :  * split it, then the pending read is adjusted to hold the remaining portion.
     223             :  *
     224             :  * We can always start a read of at least size one if we have no progress yet.
     225             :  * Otherwise it's possible that we can't start a read at all because of a lack
     226             :  * of buffers, and then false is returned.  Buffer shortages also reduce the
     227             :  * distance to a level that prevents look-ahead until buffers are released.
     228             :  */
     229             : static bool
     230     3506716 : read_stream_start_pending_read(ReadStream *stream)
     231             : {
     232             :     bool        need_wait;
     233             :     int         requested_nblocks;
     234             :     int         nblocks;
     235             :     int         flags;
     236             :     int         forwarded;
     237             :     int16       io_index;
     238             :     int16       overflow;
     239             :     int16       buffer_index;
     240             :     int16       buffer_limit;
     241             : 
     242             :     /* This should only be called with a pending read. */
     243             :     Assert(stream->pending_read_nblocks > 0);
     244             :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
     245             : 
     246             :     /* We had better not exceed the per-stream buffer limit with this read. */
     247             :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     248             :            stream->max_pinned_buffers);
     249             : 
     250             :     /* We had better not be overwriting an existing pinned buffer. */
     251     3506716 :     if (stream->pinned_buffers > 0)
     252             :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     253             :     else
     254             :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     255             : 
     256             :     /* Do we need to issue read-ahead advice? */
     257     3506716 :     flags = stream->read_buffers_flags;
     258     3506716 :     if (stream->advice_enabled)
     259             :     {
     260           0 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
     261             :         {
     262             :             /*
     263             :              * Sequential:  Issue advice until the preadv() calls have caught
     264             :              * up with the first advice issued for this sequential region, and
     265             :              * then stay of the way of the kernel's own read-ahead.
     266             :              */
     267           0 :             if (stream->seq_until_processed != InvalidBlockNumber)
     268           0 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     269             :         }
     270             :         else
     271             :         {
     272             :             /*
     273             :              * Random jump:  Note the starting location of a new potential
     274             :              * sequential region and start issuing advice.  Skip it this time
     275             :              * if the preadv() follows immediately, eg first block in stream.
     276             :              */
     277           0 :             stream->seq_until_processed = stream->pending_read_blocknum;
     278           0 :             if (stream->pinned_buffers > 0)
     279           0 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     280             :         }
     281             :     }
     282             : 
     283             :     /*
     284             :      * How many more buffers is this backend allowed?
     285             :      *
     286             :      * Forwarded buffers are already pinned and map to the leading blocks of
     287             :      * the pending read (the remaining portion of an earlier short read that
     288             :      * we're about to continue).  They are not counted in pinned_buffers, but
     289             :      * they are counted as pins already held by this backend according to the
     290             :      * buffer manager, so they must be added to the limit it grants us.
     291             :      */
     292     3506716 :     if (stream->temporary)
     293       23550 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
     294             :     else
     295     3483166 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
     296             :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     297     3506716 :     buffer_limit += stream->forwarded_buffers;
     298     3506716 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
     299     1262968 :         buffer_limit = 1;       /* guarantee progress */
     300             : 
     301             :     /* Does the per-backend limit affect this read? */
     302     3506716 :     nblocks = stream->pending_read_nblocks;
     303     3506716 :     if (buffer_limit < nblocks)
     304             :     {
     305             :         int16       new_distance;
     306             : 
     307             :         /* Shrink distance: no more look-ahead until buffers are released. */
     308        4962 :         new_distance = stream->pinned_buffers + buffer_limit;
     309        4962 :         if (stream->distance > new_distance)
     310        3464 :             stream->distance = new_distance;
     311             : 
     312             :         /* Unless we have nothing to give the consumer, stop here. */
     313        4962 :         if (stream->pinned_buffers > 0)
     314        2444 :             return false;
     315             : 
     316             :         /* A short read is required to make progress. */
     317        2518 :         nblocks = buffer_limit;
     318             :     }
     319             : 
     320             :     /*
     321             :      * We say how many blocks we want to read, but it may be smaller on return
     322             :      * if the buffer manager decides to shorten the read.  Initialize buffers
     323             :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
     324             :      * and keep the original nblocks number so we can check for forwarded
     325             :      * buffers as output, below.
     326             :      */
     327     3504272 :     buffer_index = stream->next_buffer_index;
     328     3504272 :     io_index = stream->next_io_index;
     329     5469398 :     while (stream->initialized_buffers < buffer_index + nblocks)
     330     1965126 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
     331     3504272 :     requested_nblocks = nblocks;
     332     3504272 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     333     3504272 :                                  &stream->buffers[buffer_index],
     334             :                                  stream->pending_read_blocknum,
     335             :                                  &nblocks,
     336             :                                  flags);
     337     3504260 :     stream->pinned_buffers += nblocks;
     338             : 
     339             :     /* Remember whether we need to wait before returning this buffer. */
     340     3504260 :     if (!need_wait)
     341             :     {
     342             :         /* Look-ahead distance decays, no I/O necessary. */
     343     2337552 :         if (stream->distance > 1)
     344       32820 :             stream->distance--;
     345             :     }
     346             :     else
     347             :     {
     348             :         /*
     349             :          * Remember to call WaitReadBuffers() before returning head buffer.
     350             :          * Look-ahead distance will be adjusted after waiting.
     351             :          */
     352     1166708 :         stream->ios[io_index].buffer_index = buffer_index;
     353     1166708 :         if (++stream->next_io_index == stream->max_ios)
     354       51306 :             stream->next_io_index = 0;
     355             :         Assert(stream->ios_in_progress < stream->max_ios);
     356     1166708 :         stream->ios_in_progress++;
     357     1166708 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     358             :     }
     359             : 
     360             :     /*
     361             :      * How many pins were acquired but forwarded to the next call?  These need
     362             :      * to be passed to the next StartReadBuffers() call by leaving them
     363             :      * exactly where they are in the queue, or released if the stream ends
     364             :      * early.  We need the number for accounting purposes, since they are not
     365             :      * counted in stream->pinned_buffers but we already hold them.
     366             :      */
     367     3504260 :     forwarded = 0;
     368     3507784 :     while (nblocks + forwarded < requested_nblocks &&
     369      100894 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
     370        3524 :         forwarded++;
     371     3504260 :     stream->forwarded_buffers = forwarded;
     372             : 
     373             :     /*
     374             :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     375             :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
     376             :      * the front of the array where they'll be consumed, but also leave a copy
     377             :      * in the overflow zone which the I/O operation has a pointer to (it needs
     378             :      * a contiguous array).  Both copies will be cleared when the buffers are
     379             :      * handed to the consumer.
     380             :      */
     381     3504260 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
     382     3504260 :     if (overflow > 0)
     383             :     {
     384             :         Assert(overflow < stream->queue_size);    /* can't overlap */
     385        2070 :         memcpy(&stream->buffers[0],
     386        2070 :                &stream->buffers[stream->queue_size],
     387             :                sizeof(stream->buffers[0]) * overflow);
     388             :     }
     389             : 
     390             :     /* Compute location of start of next read, without using % operator. */
     391     3504260 :     buffer_index += nblocks;
     392     3504260 :     if (buffer_index >= stream->queue_size)
     393      597020 :         buffer_index -= stream->queue_size;
     394             :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     395     3504260 :     stream->next_buffer_index = buffer_index;
     396             : 
     397             :     /* Adjust the pending read to cover the remaining portion, if any. */
     398     3504260 :     stream->pending_read_blocknum += nblocks;
     399     3504260 :     stream->pending_read_nblocks -= nblocks;
     400             : 
     401     3504260 :     return true;
     402             : }
     403             : 
     404             : static void
     405     5801432 : read_stream_look_ahead(ReadStream *stream)
     406             : {
     407             :     /*
     408             :      * Allow amortizing the cost of submitting IO over multiple IOs. This
     409             :      * requires that we don't do any operations that could lead to a deadlock
     410             :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
     411             :      * careful.
     412             :      */
     413     5801432 :     if (stream->batch_mode)
     414     4553450 :         pgaio_enter_batchmode();
     415             : 
     416     9635544 :     while (stream->ios_in_progress < stream->max_ios &&
     417     9635506 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     418             :     {
     419             :         BlockNumber blocknum;
     420             :         int16       buffer_index;
     421             :         void       *per_buffer_data;
     422             : 
     423     5628006 :         if (stream->pending_read_nblocks == stream->io_combine_limit)
     424             :         {
     425        8008 :             read_stream_start_pending_read(stream);
     426        8008 :             continue;
     427             :         }
     428             : 
     429             :         /*
     430             :          * See which block the callback wants next in the stream.  We need to
     431             :          * compute the index of the Nth block of the pending read including
     432             :          * wrap-around, but we don't want to use the expensive % operator.
     433             :          */
     434     5619998 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     435     5619998 :         if (buffer_index >= stream->queue_size)
     436       28000 :             buffer_index -= stream->queue_size;
     437             :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     438     5619998 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     439     5619998 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     440     5619998 :         if (blocknum == InvalidBlockNumber)
     441             :         {
     442             :             /* End of stream. */
     443     1793886 :             stream->distance = 0;
     444     1793886 :             break;
     445             :         }
     446             : 
     447             :         /* Can we merge it with the pending read? */
     448     3826112 :         if (stream->pending_read_nblocks > 0 &&
     449      422962 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     450             :         {
     451      422902 :             stream->pending_read_nblocks++;
     452      422902 :             continue;
     453             :         }
     454             : 
     455             :         /* We have to start the pending read before we can build another. */
     456     3403282 :         while (stream->pending_read_nblocks > 0)
     457             :         {
     458          80 :             if (!read_stream_start_pending_read(stream) ||
     459          72 :                 stream->ios_in_progress == stream->max_ios)
     460             :             {
     461             :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
     462           8 :                 read_stream_unget_block(stream, blocknum);
     463           8 :                 if (stream->batch_mode)
     464           0 :                     pgaio_exit_batchmode();
     465           8 :                 return;
     466             :             }
     467             :         }
     468             : 
     469             :         /* This is the start of a new pending read. */
     470     3403202 :         stream->pending_read_blocknum = blocknum;
     471     3403202 :         stream->pending_read_nblocks = 1;
     472             :     }
     473             : 
     474             :     /*
     475             :      * We don't start the pending read just because we've hit the distance
     476             :      * limit, preferring to give it another chance to grow to full
     477             :      * io_combine_limit size once more buffers have been consumed.  However,
     478             :      * if we've already reached io_combine_limit, or we've reached the
     479             :      * distance limit and there isn't anything pinned yet, or the callback has
     480             :      * signaled end-of-stream, we start the read immediately.  Note that the
     481             :      * pending read can exceed the distance goal, if the latter was reduced
     482             :      * after hitting the per-backend buffer limit.
     483             :      */
     484     5801424 :     if (stream->pending_read_nblocks > 0 &&
     485     3597584 :         (stream->pending_read_nblocks == stream->io_combine_limit ||
     486     3580736 :          (stream->pending_read_nblocks >= stream->distance &&
     487     3481780 :           stream->pinned_buffers == 0) ||
     488      111880 :          stream->distance == 0) &&
     489     3498628 :         stream->ios_in_progress < stream->max_ios)
     490     3498628 :         read_stream_start_pending_read(stream);
     491             : 
     492             :     /*
     493             :      * There should always be something pinned when we leave this function,
     494             :      * whether started by this call or not, unless we've hit the end of the
     495             :      * stream.  In the worst case we can always make progress one buffer at a
     496             :      * time.
     497             :      */
     498             :     Assert(stream->pinned_buffers > 0 || stream->distance == 0);
     499             : 
     500     5801412 :     if (stream->batch_mode)
     501     4553438 :         pgaio_exit_batchmode();
     502             : }
     503             : 
     504             : /*
     505             :  * Create a new read stream object that can be used to perform the equivalent
     506             :  * of a series of ReadBuffer() calls for one fork of one relation.
     507             :  * Internally, it generates larger vectored reads where possible by looking
     508             :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     509             :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     510             :  * write extra data for each block into the space provided to it.  It will
     511             :  * also receive callback_private_data for its own purposes.
     512             :  */
     513             : static ReadStream *
     514      963792 : read_stream_begin_impl(int flags,
     515             :                        BufferAccessStrategy strategy,
     516             :                        Relation rel,
     517             :                        SMgrRelation smgr,
     518             :                        char persistence,
     519             :                        ForkNumber forknum,
     520             :                        ReadStreamBlockNumberCB callback,
     521             :                        void *callback_private_data,
     522             :                        size_t per_buffer_data_size)
     523             : {
     524             :     ReadStream *stream;
     525             :     size_t      size;
     526             :     int16       queue_size;
     527             :     int16       queue_overflow;
     528             :     int         max_ios;
     529             :     int         strategy_pin_limit;
     530             :     uint32      max_pinned_buffers;
     531             :     uint32      max_possible_buffer_limit;
     532             :     Oid         tablespace_id;
     533             : 
     534             :     /*
     535             :      * Decide how many I/Os we will allow to run at the same time.  That
     536             :      * currently means advice to the kernel to tell it that we will soon read.
     537             :      * This number also affects how far we look ahead for opportunities to
     538             :      * start more I/Os.
     539             :      */
     540      963792 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     541      963792 :     if (!OidIsValid(MyDatabaseId) ||
     542     1117570 :         (rel && IsCatalogRelation(rel)) ||
     543      284134 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     544             :     {
     545             :         /*
     546             :          * Avoid circularity while trying to look up tablespace settings or
     547             :          * before spccache.c is ready.
     548             :          */
     549      785672 :         max_ios = effective_io_concurrency;
     550             :     }
     551      178120 :     else if (flags & READ_STREAM_MAINTENANCE)
     552       19730 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     553             :     else
     554      158390 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     555             : 
     556             :     /* Cap to INT16_MAX to avoid overflowing below */
     557      963792 :     max_ios = Min(max_ios, PG_INT16_MAX);
     558             : 
     559             :     /*
     560             :      * If starting a multi-block I/O near the end of the queue, we might
     561             :      * temporarily need extra space for overflowing buffers before they are
     562             :      * moved to regular circular position.  This is the maximum extra space we
     563             :      * could need.
     564             :      */
     565      963792 :     queue_overflow = io_combine_limit - 1;
     566             : 
     567             :     /*
     568             :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     569             :      * pin fewer if we can, though.  We add one so that we can make progress
     570             :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
     571             :      * this also allows an extra full I/O's worth of buffers: after an I/O
     572             :      * finishes we don't want to have to wait for its buffers to be consumed
     573             :      * before starting a new one.
     574             :      *
     575             :      * Be careful not to allow int16 to overflow.  That is possible with the
     576             :      * current GUC range limits, so this is an artificial limit of ~32k
     577             :      * buffers and we'd need to adjust the types to exceed that.  We also have
     578             :      * to allow for the spare entry and the overflow space.
     579             :      */
     580      963792 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
     581      963792 :     max_pinned_buffers = Min(max_pinned_buffers,
     582             :                              PG_INT16_MAX - queue_overflow - 1);
     583             : 
     584             :     /* Give the strategy a chance to limit the number of buffers we pin. */
     585      963792 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     586      963792 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     587             : 
     588             :     /*
     589             :      * Also limit our queue to the maximum number of pins we could ever be
     590             :      * allowed to acquire according to the buffer manager.  We may not really
     591             :      * be able to use them all due to other pins held by this backend, but
     592             :      * we'll check that later in read_stream_start_pending_read().
     593             :      */
     594      963792 :     if (SmgrIsTemp(smgr))
     595       13368 :         max_possible_buffer_limit = GetLocalPinLimit();
     596             :     else
     597      950424 :         max_possible_buffer_limit = GetPinLimit();
     598      963792 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
     599             : 
     600             :     /*
     601             :      * The limit might be zero on a system configured with too few buffers for
     602             :      * the number of connections.  We need at least one to make progress.
     603             :      */
     604      963792 :     max_pinned_buffers = Max(1, max_pinned_buffers);
     605             : 
     606             :     /*
     607             :      * We need one extra entry for buffers and per-buffer data, because users
     608             :      * of per-buffer data have access to the object until the next call to
     609             :      * read_stream_next_buffer(), so we need a gap between the head and tail
     610             :      * of the queue so that we don't clobber it.
     611             :      */
     612      963792 :     queue_size = max_pinned_buffers + 1;
     613             : 
     614             :     /*
     615             :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     616             :      * one big chunk.  Though we have queue_size buffers, we want to be able
     617             :      * to assume that all the buffers for a single read are contiguous (i.e.
     618             :      * don't wrap around halfway through), so we allow temporary overflows of
     619             :      * up to the maximum possible overflow size.
     620             :      */
     621      963792 :     size = offsetof(ReadStream, buffers);
     622      963792 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
     623      963792 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     624      963792 :     size += per_buffer_data_size * queue_size;
     625      963792 :     size += MAXIMUM_ALIGNOF * 2;
     626      963792 :     stream = (ReadStream *) palloc(size);
     627      963792 :     memset(stream, 0, offsetof(ReadStream, buffers));
     628      963792 :     stream->ios = (InProgressIO *)
     629      963792 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
     630      963792 :     if (per_buffer_data_size > 0)
     631      140272 :         stream->per_buffer_data = (void *)
     632      140272 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     633             : 
     634      963792 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
     635      963792 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
     636             : 
     637             : #ifdef USE_PREFETCH
     638             : 
     639             :     /*
     640             :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
     641             :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
     642             :      * caller hasn't promised sequential access (overriding our detection
     643             :      * heuristics), and max_ios hasn't been set to zero.
     644             :      */
     645      963792 :     if (stream->sync_mode &&
     646           0 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
     647           0 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     648             :         max_ios > 0)
     649           0 :         stream->advice_enabled = true;
     650             : #endif
     651             : 
     652             :     /*
     653             :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
     654             :      * we still need to allocate space to combine and run one I/O.  Bump it up
     655             :      * to one, and remember to ask for synchronous I/O only.
     656             :      */
     657      963792 :     if (max_ios == 0)
     658             :     {
     659           0 :         max_ios = 1;
     660           0 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
     661             :     }
     662             : 
     663             :     /*
     664             :      * Capture stable values for these two GUC-derived numbers for the
     665             :      * lifetime of this stream, so we don't have to worry about the GUCs
     666             :      * changing underneath us beyond this point.
     667             :      */
     668      963792 :     stream->max_ios = max_ios;
     669      963792 :     stream->io_combine_limit = io_combine_limit;
     670             : 
     671      963792 :     stream->per_buffer_data_size = per_buffer_data_size;
     672      963792 :     stream->max_pinned_buffers = max_pinned_buffers;
     673      963792 :     stream->queue_size = queue_size;
     674      963792 :     stream->callback = callback;
     675      963792 :     stream->callback_private_data = callback_private_data;
     676      963792 :     stream->buffered_blocknum = InvalidBlockNumber;
     677      963792 :     stream->seq_blocknum = InvalidBlockNumber;
     678      963792 :     stream->seq_until_processed = InvalidBlockNumber;
     679      963792 :     stream->temporary = SmgrIsTemp(smgr);
     680             : 
     681             :     /*
     682             :      * Skip the initial ramp-up phase if the caller says we're going to be
     683             :      * reading the whole relation.  This way we start out assuming we'll be
     684             :      * doing full io_combine_limit sized reads.
     685             :      */
     686      963792 :     if (flags & READ_STREAM_FULL)
     687      124924 :         stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
     688             :     else
     689      838868 :         stream->distance = 1;
     690             : 
     691             :     /*
     692             :      * Since we always access the same relation, we can initialize parts of
     693             :      * the ReadBuffersOperation objects and leave them that way, to avoid
     694             :      * wasting CPU cycles writing to them for each read.
     695             :      */
     696    16427372 :     for (int i = 0; i < max_ios; ++i)
     697             :     {
     698    15463580 :         stream->ios[i].op.rel = rel;
     699    15463580 :         stream->ios[i].op.smgr = smgr;
     700    15463580 :         stream->ios[i].op.persistence = persistence;
     701    15463580 :         stream->ios[i].op.forknum = forknum;
     702    15463580 :         stream->ios[i].op.strategy = strategy;
     703             :     }
     704             : 
     705      963792 :     return stream;
     706             : }
     707             : 
     708             : /*
     709             :  * Create a new read stream for reading a relation.
     710             :  * See read_stream_begin_impl() for the detailed explanation.
     711             :  */
     712             : ReadStream *
     713      850102 : read_stream_begin_relation(int flags,
     714             :                            BufferAccessStrategy strategy,
     715             :                            Relation rel,
     716             :                            ForkNumber forknum,
     717             :                            ReadStreamBlockNumberCB callback,
     718             :                            void *callback_private_data,
     719             :                            size_t per_buffer_data_size)
     720             : {
     721      850102 :     return read_stream_begin_impl(flags,
     722             :                                   strategy,
     723             :                                   rel,
     724             :                                   RelationGetSmgr(rel),
     725      850102 :                                   rel->rd_rel->relpersistence,
     726             :                                   forknum,
     727             :                                   callback,
     728             :                                   callback_private_data,
     729             :                                   per_buffer_data_size);
     730             : }
     731             : 
     732             : /*
     733             :  * Create a new read stream for reading a SMgr relation.
     734             :  * See read_stream_begin_impl() for the detailed explanation.
     735             :  */
     736             : ReadStream *
     737      113690 : read_stream_begin_smgr_relation(int flags,
     738             :                                 BufferAccessStrategy strategy,
     739             :                                 SMgrRelation smgr,
     740             :                                 char smgr_persistence,
     741             :                                 ForkNumber forknum,
     742             :                                 ReadStreamBlockNumberCB callback,
     743             :                                 void *callback_private_data,
     744             :                                 size_t per_buffer_data_size)
     745             : {
     746      113690 :     return read_stream_begin_impl(flags,
     747             :                                   strategy,
     748             :                                   NULL,
     749             :                                   smgr,
     750             :                                   smgr_persistence,
     751             :                                   forknum,
     752             :                                   callback,
     753             :                                   callback_private_data,
     754             :                                   per_buffer_data_size);
     755             : }
     756             : 
     757             : /*
     758             :  * Pull one pinned buffer out of a stream.  Each call returns successive
     759             :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     760             :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     761             :  * per-buffer data that the callback had a chance to populate, which remains
     762             :  * valid until the next call to read_stream_next_buffer().  When the stream
     763             :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     764             :  * the stream early at any time by calling read_stream_end().
     765             :  */
     766             : Buffer
     767    11217350 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     768             : {
     769             :     Buffer      buffer;
     770             :     int16       oldest_buffer_index;
     771             : 
     772             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     773             : 
     774             :     /*
     775             :      * A fast path for all-cached scans.  This is the same as the usual
     776             :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
     777             :      * we can skip the queue management code, stay in the same buffer slot and
     778             :      * use singular StartReadBuffer().
     779             :      */
     780    11217350 :     if (likely(stream->fast_path))
     781             :     {
     782             :         BlockNumber next_blocknum;
     783             : 
     784             :         /* Fast path assumptions. */
     785             :         Assert(stream->ios_in_progress == 0);
     786             :         Assert(stream->forwarded_buffers == 0);
     787             :         Assert(stream->pinned_buffers == 1);
     788             :         Assert(stream->distance == 1);
     789             :         Assert(stream->pending_read_nblocks == 0);
     790             :         Assert(stream->per_buffer_data_size == 0);
     791             :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
     792             : 
     793             :         /* We're going to return the buffer we pinned last time. */
     794     3707578 :         oldest_buffer_index = stream->oldest_buffer_index;
     795             :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     796             :                stream->next_buffer_index);
     797     3707578 :         buffer = stream->buffers[oldest_buffer_index];
     798             :         Assert(buffer != InvalidBuffer);
     799             : 
     800             :         /* Choose the next block to pin. */
     801     3707578 :         next_blocknum = read_stream_get_block(stream, NULL);
     802             : 
     803     3707578 :         if (likely(next_blocknum != InvalidBlockNumber))
     804             :         {
     805     3550688 :             int         flags = stream->read_buffers_flags;
     806             : 
     807     3550688 :             if (stream->advice_enabled)
     808           0 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     809             : 
     810             :             /*
     811             :              * Pin a buffer for the next call.  Same buffer entry, and
     812             :              * arbitrary I/O entry (they're all free).  We don't have to
     813             :              * adjust pinned_buffers because we're transferring one to caller
     814             :              * but pinning one more.
     815             :              *
     816             :              * In the fast path we don't need to check the pin limit.  We're
     817             :              * always allowed at least one pin so that progress can be made,
     818             :              * and that's all we need here.  Although two pins are momentarily
     819             :              * held at the same time, the model used here is that the stream
     820             :              * holds only one, and the other now belongs to the caller.
     821             :              */
     822     3550688 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
     823             :                                         &stream->buffers[oldest_buffer_index],
     824             :                                         next_blocknum,
     825             :                                         flags)))
     826             :             {
     827             :                 /* Fast return. */
     828     3527232 :                 return buffer;
     829             :             }
     830             : 
     831             :             /* Next call must wait for I/O for the newly pinned buffer. */
     832       23456 :             stream->oldest_io_index = 0;
     833       23456 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     834       23456 :             stream->ios_in_progress = 1;
     835       23456 :             stream->ios[0].buffer_index = oldest_buffer_index;
     836       23456 :             stream->seq_blocknum = next_blocknum + 1;
     837             :         }
     838             :         else
     839             :         {
     840             :             /* No more blocks, end of stream. */
     841      156890 :             stream->distance = 0;
     842      156890 :             stream->oldest_buffer_index = stream->next_buffer_index;
     843      156890 :             stream->pinned_buffers = 0;
     844      156890 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
     845             :         }
     846             : 
     847      180346 :         stream->fast_path = false;
     848      180346 :         return buffer;
     849             :     }
     850             : #endif
     851             : 
     852     7509772 :     if (unlikely(stream->pinned_buffers == 0))
     853             :     {
     854             :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     855             : 
     856             :         /* End of stream reached?  */
     857     4991748 :         if (stream->distance == 0)
     858     2857752 :             return InvalidBuffer;
     859             : 
     860             :         /*
     861             :          * The usual order of operations is that we look ahead at the bottom
     862             :          * of this function after potentially finishing an I/O and making
     863             :          * space for more, but if we're just starting up we'll need to crank
     864             :          * the handle to get started.
     865             :          */
     866     2133996 :         read_stream_look_ahead(stream);
     867             : 
     868             :         /* End of stream reached? */
     869     2133996 :         if (stream->pinned_buffers == 0)
     870             :         {
     871             :             Assert(stream->distance == 0);
     872      984584 :             return InvalidBuffer;
     873             :         }
     874             :     }
     875             : 
     876             :     /* Grab the oldest pinned buffer and associated per-buffer data. */
     877             :     Assert(stream->pinned_buffers > 0);
     878     3667436 :     oldest_buffer_index = stream->oldest_buffer_index;
     879             :     Assert(oldest_buffer_index >= 0 &&
     880             :            oldest_buffer_index < stream->queue_size);
     881     3667436 :     buffer = stream->buffers[oldest_buffer_index];
     882     3667436 :     if (per_buffer_data)
     883     1126212 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     884             : 
     885             :     Assert(BufferIsValid(buffer));
     886             : 
     887             :     /* Do we have to wait for an associated I/O first? */
     888     3667436 :     if (stream->ios_in_progress > 0 &&
     889     1360714 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     890             :     {
     891     1189570 :         int16       io_index = stream->oldest_io_index;
     892             :         int32       distance;   /* wider temporary value, clamped below */
     893             : 
     894             :         /* Sanity check that we still agree on the buffers. */
     895             :         Assert(stream->ios[io_index].op.buffers ==
     896             :                &stream->buffers[oldest_buffer_index]);
     897             : 
     898     1189570 :         WaitReadBuffers(&stream->ios[io_index].op);
     899             : 
     900             :         Assert(stream->ios_in_progress > 0);
     901     1189570 :         stream->ios_in_progress--;
     902     1189570 :         if (++stream->oldest_io_index == stream->max_ios)
     903       51306 :             stream->oldest_io_index = 0;
     904             : 
     905             :         /* Look-ahead distance ramps up rapidly after we do I/O. */
     906     1189570 :         distance = stream->distance * 2;
     907     1189570 :         distance = Min(distance, stream->max_pinned_buffers);
     908     1189570 :         stream->distance = distance;
     909             : 
     910             :         /*
     911             :          * If we've reached the first block of a sequential region we're
     912             :          * issuing advice for, cancel that until the next jump.  The kernel
     913             :          * will see the sequential preadv() pattern starting here.
     914             :          */
     915     1189570 :         if (stream->advice_enabled &&
     916           0 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
     917           0 :             stream->seq_until_processed = InvalidBlockNumber;
     918             :     }
     919             : 
     920             :     /*
     921             :      * We must zap this queue entry, or else it would appear as a forwarded
     922             :      * buffer.  If it's potentially in the overflow zone (ie from a
     923             :      * multi-block I/O that wrapped around the queue), also zap the copy.
     924             :      */
     925     3667436 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
     926     3667436 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
     927     2642170 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
     928             :             InvalidBuffer;
     929             : 
     930             : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
     931             : 
     932             :     /*
     933             :      * The caller will get access to the per-buffer data, until the next call.
     934             :      * We wipe the one before, which is never occupied because queue_size
     935             :      * allowed one extra element.  This will hopefully trip up client code
     936             :      * that is holding a dangling pointer to it.
     937             :      */
     938             :     if (stream->per_buffer_data)
     939             :     {
     940             :         void       *per_buffer_data;
     941             : 
     942             :         per_buffer_data = get_per_buffer_data(stream,
     943             :                                               oldest_buffer_index == 0 ?
     944             :                                               stream->queue_size - 1 :
     945             :                                               oldest_buffer_index - 1);
     946             : 
     947             : #if defined(CLOBBER_FREED_MEMORY)
     948             :         /* This also tells Valgrind the memory is "noaccess". */
     949             :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
     950             : #elif defined(USE_VALGRIND)
     951             :         /* Tell it ourselves. */
     952             :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
     953             :                                    stream->per_buffer_data_size);
     954             : #endif
     955             :     }
     956             : #endif
     957             : 
     958             :     /* Pin transferred to caller. */
     959             :     Assert(stream->pinned_buffers > 0);
     960     3667436 :     stream->pinned_buffers--;
     961             : 
     962             :     /* Advance oldest buffer, with wrap-around. */
     963     3667436 :     stream->oldest_buffer_index++;
     964     3667436 :     if (stream->oldest_buffer_index == stream->queue_size)
     965      583670 :         stream->oldest_buffer_index = 0;
     966             : 
     967             :     /* Prepare for the next call. */
     968     3667436 :     read_stream_look_ahead(stream);
     969             : 
     970             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     971             :     /* See if we can take the fast path for all-cached scans next time. */
     972     3667424 :     if (stream->ios_in_progress == 0 &&
     973     2464858 :         stream->forwarded_buffers == 0 &&
     974     2464386 :         stream->pinned_buffers == 1 &&
     975     1357960 :         stream->distance == 1 &&
     976     1231506 :         stream->pending_read_nblocks == 0 &&
     977     1229242 :         stream->per_buffer_data_size == 0)
     978             :     {
     979      353480 :         stream->fast_path = true;
     980             :     }
     981             : #endif
     982             : 
     983     3667424 :     return buffer;
     984             : }
     985             : 
     986             : /*
     987             :  * Transitional support for code that would like to perform or skip reads
     988             :  * itself, without using the stream.  Returns, and consumes, the next block
     989             :  * number that would be read by the stream's look-ahead algorithm, or
     990             :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
     991             :  * strategy that would be used to read it.
     992             :  */
     993             : BlockNumber
     994           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
     995             : {
     996           0 :     *strategy = stream->ios[0].op.strategy;
     997           0 :     return read_stream_get_block(stream, NULL);
     998             : }
     999             : 
    1000             : /*
    1001             :  * Reset a read stream by releasing any queued up buffers, allowing the stream
    1002             :  * to be used again for different blocks.  This can be used to clear an
    1003             :  * end-of-stream condition and start again, or to throw away blocks that were
    1004             :  * speculatively read and read some different blocks instead.
    1005             :  */
    1006             : void
    1007     2135264 : read_stream_reset(ReadStream *stream)
    1008             : {
    1009             :     int16       index;
    1010             :     Buffer      buffer;
    1011             : 
    1012             :     /* Stop looking ahead. */
    1013     2135264 :     stream->distance = 0;
    1014             : 
    1015             :     /* Forget buffered block number and fast path state. */
    1016     2135264 :     stream->buffered_blocknum = InvalidBlockNumber;
    1017     2135264 :     stream->fast_path = false;
    1018             : 
    1019             :     /* Unpin anything that wasn't consumed. */
    1020     2358658 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    1021      223394 :         ReleaseBuffer(buffer);
    1022             : 
    1023             :     /* Unpin any unused forwarded buffers. */
    1024     2135264 :     index = stream->next_buffer_index;
    1025     2135264 :     while (index < stream->initialized_buffers &&
    1026      435004 :            (buffer = stream->buffers[index]) != InvalidBuffer)
    1027             :     {
    1028             :         Assert(stream->forwarded_buffers > 0);
    1029           0 :         stream->forwarded_buffers--;
    1030           0 :         ReleaseBuffer(buffer);
    1031             : 
    1032           0 :         stream->buffers[index] = InvalidBuffer;
    1033           0 :         if (index < stream->io_combine_limit - 1)
    1034           0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
    1035             : 
    1036           0 :         if (++index == stream->queue_size)
    1037           0 :             index = 0;
    1038             :     }
    1039             : 
    1040             :     Assert(stream->forwarded_buffers == 0);
    1041             :     Assert(stream->pinned_buffers == 0);
    1042             :     Assert(stream->ios_in_progress == 0);
    1043             : 
    1044             :     /* Start off assuming data is cached. */
    1045     2135264 :     stream->distance = 1;
    1046     2135264 : }
    1047             : 
    1048             : /*
    1049             :  * Release and free a read stream.
    1050             :  */
    1051             : void
    1052      959336 : read_stream_end(ReadStream *stream)
    1053             : {
    1054      959336 :     read_stream_reset(stream);
    1055      959336 :     pfree(stream);
    1056      959336 : }

Generated by: LCOV version 1.14