LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 93.8 % 276 259
Test Date: 2026-03-14 12:15:02 Functions: 80.0 % 15 12
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * read_stream.c
       4              :  *    Mechanism for accessing buffered relation data with look-ahead
       5              :  *
       6              :  * Code that needs to access relation data typically pins blocks one at a
       7              :  * time, often in a predictable order that might be sequential or data-driven.
       8              :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9              :  * because blocks that are not yet in the buffer pool require I/O operations
      10              :  * that are small and might stall waiting for storage.  This mechanism looks
      11              :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12              :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13              :  * distance.
      14              :  *
      15              :  * A user-provided callback generates a stream of block numbers that is used
      16              :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17              :  * pending read.  When that isn't possible, the existing pending read is sent
      18              :  * to StartReadBuffers() so that a new one can begin to form.
      19              :  *
      20              :  * The algorithm for controlling the look-ahead distance is based on recent
      21              :  * cache hit and miss history.  When no I/O is necessary, there is no benefit
      22              :  * in looking ahead more than one block.  This is the default initial
      23              :  * assumption, but when blocks needing I/O are streamed, the distance is
      24              :  * increased rapidly to try to benefit from I/O combining and concurrency.  It
      25              :  * is reduced gradually when cached blocks are streamed.
      26              :  *
      27              :  * The main data structure is a circular queue of buffers of size
      28              :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      29              :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      30              :  * variable sized object that is passed from the callback to the consumer of
      31              :  * buffers.
      32              :  *
      33              :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      34              :  * I/Os that have been started with StartReadBuffers(), and for which
      35              :  * WaitReadBuffers() must be called before returning the buffer.
      36              :  *
      37              :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      38              :  * successive calls, then these data structures might appear as follows:
      39              :  *
      40              :  *                          buffers buf/data       ios
      41              :  *
      42              :  *                          +----+  +-----+       +--------+
      43              :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      44              :  *                          +----+  +-----+  |    +--------+
      45              :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      46              :  *                          +----+  +-----+  | |  +--------+
      47              :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      48              :  *                          +----+  +-----+    |  +--------+
      49              :  *                          | 43 |  |  ?  |    |  |        |
      50              :  *                          +----+  +-----+    |  +--------+
      51              :  *                          | 44 |  |  ?  |    |  |        |
      52              :  *                          +----+  +-----+    |  +--------+
      53              :  *                          | 60 |  |  ?  |<---+
      54              :  *                          +----+  +-----+
      55              :  *     next_buffer_index -> |    |  |     |
      56              :  *                          +----+  +-----+
      57              :  *
      58              :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      59              :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      60              :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      61              :  * does block 60.
      62              :  *
      63              :  *
      64              :  * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
      65              :  * Portions Copyright (c) 1994, Regents of the University of California
      66              :  *
      67              :  * IDENTIFICATION
      68              :  *    src/backend/storage/aio/read_stream.c
      69              :  *
      70              :  *-------------------------------------------------------------------------
      71              :  */
      72              : #include "postgres.h"
      73              : 
      74              : #include "miscadmin.h"
      75              : #include "storage/aio.h"
      76              : #include "storage/fd.h"
      77              : #include "storage/smgr.h"
      78              : #include "storage/read_stream.h"
      79              : #include "utils/memdebug.h"
      80              : #include "utils/rel.h"
      81              : #include "utils/spccache.h"
      82              : 
      83              : typedef struct InProgressIO
      84              : {
      85              :     int16       buffer_index;
      86              :     ReadBuffersOperation op;
      87              : } InProgressIO;
      88              : 
      89              : /*
      90              :  * State for managing a stream of reads.
      91              :  */
      92              : struct ReadStream
      93              : {
      94              :     int16       max_ios;
      95              :     int16       io_combine_limit;
      96              :     int16       ios_in_progress;
      97              :     int16       queue_size;
      98              :     int16       max_pinned_buffers;
      99              :     int16       forwarded_buffers;
     100              :     int16       pinned_buffers;
     101              :     int16       distance;
     102              :     int16       initialized_buffers;
     103              :     int16       resume_distance;
     104              :     int         read_buffers_flags;
     105              :     bool        sync_mode;      /* using io_method=sync */
     106              :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
     107              :     bool        advice_enabled;
     108              :     bool        temporary;
     109              : 
     110              :     /*
     111              :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     112              :      * control problems when I/Os are split.
     113              :      */
     114              :     BlockNumber buffered_blocknum;
     115              : 
     116              :     /*
     117              :      * The callback that will tell us which block numbers to read, and an
     118              :      * opaque pointer that will be pass to it for its own purposes.
     119              :      */
     120              :     ReadStreamBlockNumberCB callback;
     121              :     void       *callback_private_data;
     122              : 
     123              :     /* Next expected block, for detecting sequential access. */
     124              :     BlockNumber seq_blocknum;
     125              :     BlockNumber seq_until_processed;
     126              : 
     127              :     /* The read operation we are currently preparing. */
     128              :     BlockNumber pending_read_blocknum;
     129              :     int16       pending_read_nblocks;
     130              : 
     131              :     /* Space for buffers and optional per-buffer private data. */
     132              :     size_t      per_buffer_data_size;
     133              :     void       *per_buffer_data;
     134              : 
     135              :     /* Read operations that have been started but not waited for yet. */
     136              :     InProgressIO *ios;
     137              :     int16       oldest_io_index;
     138              :     int16       next_io_index;
     139              : 
     140              :     bool        fast_path;
     141              : 
     142              :     /* Circular queue of buffers. */
     143              :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     144              :     int16       next_buffer_index;  /* Index of next buffer to pin */
     145              :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     146              : };
     147              : 
     148              : /*
     149              :  * Return a pointer to the per-buffer data by index.
     150              :  */
     151              : static inline void *
     152      4614406 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     153              : {
     154      9228812 :     return (char *) stream->per_buffer_data +
     155      4614406 :         stream->per_buffer_data_size * buffer_index;
     156              : }
     157              : 
     158              : /*
     159              :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     160              :  * blocks [current_blocknum, last_exclusive).
     161              :  */
     162              : BlockNumber
     163       382574 : block_range_read_stream_cb(ReadStream *stream,
     164              :                            void *callback_private_data,
     165              :                            void *per_buffer_data)
     166              : {
     167       382574 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     168              : 
     169       382574 :     if (p->current_blocknum < p->last_exclusive)
     170       313326 :         return p->current_blocknum++;
     171              : 
     172        69248 :     return InvalidBlockNumber;
     173              : }
     174              : 
     175              : /*
     176              :  * Ask the callback which block it would like us to read next, with a one block
     177              :  * buffer in front to allow read_stream_unget_block() to work.
     178              :  */
     179              : static inline BlockNumber
     180      6160602 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     181              : {
     182              :     BlockNumber blocknum;
     183              : 
     184      6160602 :     blocknum = stream->buffered_blocknum;
     185      6160602 :     if (blocknum != InvalidBlockNumber)
     186            4 :         stream->buffered_blocknum = InvalidBlockNumber;
     187              :     else
     188              :     {
     189              :         /*
     190              :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
     191              :          * the "noaccess" state that was set when the consumer moved past this
     192              :          * entry last time around the queue, and should also catch callbacks
     193              :          * that fail to initialize data that the buffer consumer later
     194              :          * accesses.  On the first go around, it is undefined already.
     195              :          */
     196              :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     197              :                                     stream->per_buffer_data_size);
     198      6160598 :         blocknum = stream->callback(stream,
     199              :                                     stream->callback_private_data,
     200              :                                     per_buffer_data);
     201              :     }
     202              : 
     203      6160602 :     return blocknum;
     204              : }
     205              : 
     206              : /*
     207              :  * In order to deal with buffer shortages and I/O limits after short reads, we
     208              :  * sometimes need to defer handling of a block we've already consumed from the
     209              :  * registered callback until later.
     210              :  */
     211              : static inline void
     212            4 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     213              : {
     214              :     /* We shouldn't ever unget more than one block. */
     215              :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     216              :     Assert(blocknum != InvalidBlockNumber);
     217            4 :     stream->buffered_blocknum = blocknum;
     218            4 : }
     219              : 
     220              : /*
     221              :  * Start as much of the current pending read as we can.  If we have to split it
     222              :  * because of the per-backend buffer limit, or the buffer manager decides to
     223              :  * split it, then the pending read is adjusted to hold the remaining portion.
     224              :  *
     225              :  * We can always start a read of at least size one if we have no progress yet.
     226              :  * Otherwise it's possible that we can't start a read at all because of a lack
     227              :  * of buffers, and then false is returned.  Buffer shortages also reduce the
     228              :  * distance to a level that prevents look-ahead until buffers are released.
     229              :  */
     230              : static bool
     231      2229111 : read_stream_start_pending_read(ReadStream *stream)
     232              : {
     233              :     bool        need_wait;
     234              :     int         requested_nblocks;
     235              :     int         nblocks;
     236              :     int         flags;
     237              :     int         forwarded;
     238              :     int16       io_index;
     239              :     int16       overflow;
     240              :     int16       buffer_index;
     241              :     int         buffer_limit;
     242              : 
     243              :     /* This should only be called with a pending read. */
     244              :     Assert(stream->pending_read_nblocks > 0);
     245              :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
     246              : 
     247              :     /* We had better not exceed the per-stream buffer limit with this read. */
     248              :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     249              :            stream->max_pinned_buffers);
     250              : 
     251              : #ifdef USE_ASSERT_CHECKING
     252              :     /* We had better not be overwriting an existing pinned buffer. */
     253              :     if (stream->pinned_buffers > 0)
     254              :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     255              :     else
     256              :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     257              : 
     258              :     /*
     259              :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
     260              :      * had to split the operation should match the leading blocks of this
     261              :      * following StartReadBuffers() call.
     262              :      */
     263              :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     264              :     for (int i = 0; i < stream->forwarded_buffers; ++i)
     265              :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
     266              :                stream->pending_read_blocknum + i);
     267              : 
     268              :     /*
     269              :      * Check that we've cleared the queue/overflow entries corresponding to
     270              :      * the rest of the blocks covered by this read, unless it's the first go
     271              :      * around and we haven't even initialized them yet.
     272              :      */
     273              :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
     274              :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
     275              :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
     276              : #endif
     277              : 
     278              :     /* Do we need to issue read-ahead advice? */
     279      2229111 :     flags = stream->read_buffers_flags;
     280      2229111 :     if (stream->advice_enabled)
     281              :     {
     282         1620 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
     283              :         {
     284              :             /*
     285              :              * Sequential:  Issue advice until the preadv() calls have caught
     286              :              * up with the first advice issued for this sequential region, and
     287              :              * then stay out of the way of the kernel's own read-ahead.
     288              :              */
     289           22 :             if (stream->seq_until_processed != InvalidBlockNumber)
     290            1 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     291              :         }
     292              :         else
     293              :         {
     294              :             /*
     295              :              * Random jump:  Note the starting location of a new potential
     296              :              * sequential region and start issuing advice.  Skip it this time
     297              :              * if the preadv() follows immediately, eg first block in stream.
     298              :              */
     299         1598 :             stream->seq_until_processed = stream->pending_read_blocknum;
     300         1598 :             if (stream->pinned_buffers > 0)
     301           25 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     302              :         }
     303              :     }
     304              : 
     305              :     /*
     306              :      * How many more buffers is this backend allowed?
     307              :      *
     308              :      * Forwarded buffers are already pinned and map to the leading blocks of
     309              :      * the pending read (the remaining portion of an earlier short read that
     310              :      * we're about to continue).  They are not counted in pinned_buffers, but
     311              :      * they are counted as pins already held by this backend according to the
     312              :      * buffer manager, so they must be added to the limit it grants us.
     313              :      */
     314      2229111 :     if (stream->temporary)
     315        12069 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
     316              :     else
     317      2217042 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
     318              :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     319              : 
     320      2229111 :     buffer_limit += stream->forwarded_buffers;
     321      2229111 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
     322              : 
     323      2229111 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
     324       668646 :         buffer_limit = 1;       /* guarantee progress */
     325              : 
     326              :     /* Does the per-backend limit affect this read? */
     327      2229111 :     nblocks = stream->pending_read_nblocks;
     328      2229111 :     if (buffer_limit < nblocks)
     329              :     {
     330              :         int16       new_distance;
     331              : 
     332              :         /* Shrink distance: no more look-ahead until buffers are released. */
     333         2734 :         new_distance = stream->pinned_buffers + buffer_limit;
     334         2734 :         if (stream->distance > new_distance)
     335         1836 :             stream->distance = new_distance;
     336              : 
     337              :         /* Unless we have nothing to give the consumer, stop here. */
     338         2734 :         if (stream->pinned_buffers > 0)
     339         1429 :             return false;
     340              : 
     341              :         /* A short read is required to make progress. */
     342         1305 :         nblocks = buffer_limit;
     343              :     }
     344              : 
     345              :     /*
     346              :      * We say how many blocks we want to read, but it may be smaller on return
     347              :      * if the buffer manager decides to shorten the read.  Initialize buffers
     348              :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
     349              :      * and keep the original nblocks number so we can check for forwarded
     350              :      * buffers as output, below.
     351              :      */
     352      2227682 :     buffer_index = stream->next_buffer_index;
     353      2227682 :     io_index = stream->next_io_index;
     354      3703987 :     while (stream->initialized_buffers < buffer_index + nblocks)
     355      1476305 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
     356      2227682 :     requested_nblocks = nblocks;
     357      2227682 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     358      2227682 :                                  &stream->buffers[buffer_index],
     359              :                                  stream->pending_read_blocknum,
     360              :                                  &nblocks,
     361              :                                  flags);
     362      2227676 :     stream->pinned_buffers += nblocks;
     363              : 
     364              :     /* Remember whether we need to wait before returning this buffer. */
     365      2227676 :     if (!need_wait)
     366              :     {
     367              :         /* Look-ahead distance decays, no I/O necessary. */
     368      1614925 :         if (stream->distance > 1)
     369        17598 :             stream->distance--;
     370              :     }
     371              :     else
     372              :     {
     373              :         /*
     374              :          * Remember to call WaitReadBuffers() before returning head buffer.
     375              :          * Look-ahead distance will be adjusted after waiting.
     376              :          */
     377       612751 :         stream->ios[io_index].buffer_index = buffer_index;
     378       612751 :         if (++stream->next_io_index == stream->max_ios)
     379        25411 :             stream->next_io_index = 0;
     380              :         Assert(stream->ios_in_progress < stream->max_ios);
     381       612751 :         stream->ios_in_progress++;
     382       612751 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     383              :     }
     384              : 
     385              :     /*
     386              :      * How many pins were acquired but forwarded to the next call?  These need
     387              :      * to be passed to the next StartReadBuffers() call by leaving them
     388              :      * exactly where they are in the queue, or released if the stream ends
     389              :      * early.  We need the number for accounting purposes, since they are not
     390              :      * counted in stream->pinned_buffers but we already hold them.
     391              :      */
     392      2227676 :     forwarded = 0;
     393      2229337 :     while (nblocks + forwarded < requested_nblocks &&
     394        54647 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
     395         1661 :         forwarded++;
     396      2227676 :     stream->forwarded_buffers = forwarded;
     397              : 
     398              :     /*
     399              :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     400              :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
     401              :      * the front of the array where they'll be consumed, but also leave a copy
     402              :      * in the overflow zone which the I/O operation has a pointer to (it needs
     403              :      * a contiguous array).  Both copies will be cleared when the buffers are
     404              :      * handed to the consumer.
     405              :      */
     406      2227676 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
     407      2227676 :     if (overflow > 0)
     408              :     {
     409              :         Assert(overflow < stream->queue_size);    /* can't overlap */
     410          294 :         memcpy(&stream->buffers[0],
     411          294 :                &stream->buffers[stream->queue_size],
     412              :                sizeof(stream->buffers[0]) * overflow);
     413              :     }
     414              : 
     415              :     /* Compute location of start of next read, without using % operator. */
     416      2227676 :     buffer_index += nblocks;
     417      2227676 :     if (buffer_index >= stream->queue_size)
     418       312967 :         buffer_index -= stream->queue_size;
     419              :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     420      2227676 :     stream->next_buffer_index = buffer_index;
     421              : 
     422              :     /* Adjust the pending read to cover the remaining portion, if any. */
     423      2227676 :     stream->pending_read_blocknum += nblocks;
     424      2227676 :     stream->pending_read_nblocks -= nblocks;
     425              : 
     426      2227676 :     return true;
     427              : }
     428              : 
     429              : static void
     430      4053924 : read_stream_look_ahead(ReadStream *stream)
     431              : {
     432              :     /*
     433              :      * Allow amortizing the cost of submitting IO over multiple IOs. This
     434              :      * requires that we don't do any operations that could lead to a deadlock
     435              :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
     436              :      * careful.
     437              :      */
     438      4053924 :     if (stream->batch_mode)
     439      3493181 :         pgaio_enter_batchmode();
     440              : 
     441      6459922 :     while (stream->ios_in_progress < stream->max_ios &&
     442      6459922 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     443              :     {
     444              :         BlockNumber blocknum;
     445              :         int16       buffer_index;
     446              :         void       *per_buffer_data;
     447              : 
     448      3955877 :         if (stream->pending_read_nblocks == stream->io_combine_limit)
     449              :         {
     450         4323 :             read_stream_start_pending_read(stream);
     451         4323 :             continue;
     452              :         }
     453              : 
     454              :         /*
     455              :          * See which block the callback wants next in the stream.  We need to
     456              :          * compute the index of the Nth block of the pending read including
     457              :          * wrap-around, but we don't want to use the expensive % operator.
     458              :          */
     459      3951554 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     460      3951554 :         if (buffer_index >= stream->queue_size)
     461         1722 :             buffer_index -= stream->queue_size;
     462              :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     463      3951554 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     464      3951554 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     465      3951554 :         if (blocknum == InvalidBlockNumber)
     466              :         {
     467              :             /* End of stream. */
     468      1549875 :             stream->distance = 0;
     469      1549875 :             break;
     470              :         }
     471              : 
     472              :         /* Can we merge it with the pending read? */
     473      2401679 :         if (stream->pending_read_nblocks > 0 &&
     474       228712 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     475              :         {
     476       228675 :             stream->pending_read_nblocks++;
     477       228675 :             continue;
     478              :         }
     479              : 
     480              :         /* We have to start the pending read before we can build another. */
     481      2173046 :         while (stream->pending_read_nblocks > 0)
     482              :         {
     483           46 :             if (!read_stream_start_pending_read(stream) ||
     484           42 :                 stream->ios_in_progress == stream->max_ios)
     485              :             {
     486              :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
     487            4 :                 read_stream_unget_block(stream, blocknum);
     488            4 :                 if (stream->batch_mode)
     489            4 :                     pgaio_exit_batchmode();
     490            4 :                 return;
     491              :             }
     492              :         }
     493              : 
     494              :         /* This is the start of a new pending read. */
     495      2173000 :         stream->pending_read_blocknum = blocknum;
     496      2173000 :         stream->pending_read_nblocks = 1;
     497              :     }
     498              : 
     499              :     /*
     500              :      * We don't start the pending read just because we've hit the distance
     501              :      * limit, preferring to give it another chance to grow to full
     502              :      * io_combine_limit size once more buffers have been consumed.  However,
     503              :      * if we've already reached io_combine_limit, or we've reached the
     504              :      * distance limit and there isn't anything pinned yet, or the callback has
     505              :      * signaled end-of-stream, we start the read immediately.  Note that the
     506              :      * pending read can exceed the distance goal, if the latter was reduced
     507              :      * after hitting the per-backend buffer limit.
     508              :      */
     509      4053920 :     if (stream->pending_read_nblocks > 0 &&
     510      2267468 :         (stream->pending_read_nblocks == stream->io_combine_limit ||
     511      2259085 :          (stream->pending_read_nblocks >= stream->distance &&
     512      2216359 :           stream->pinned_buffers == 0) ||
     513        49015 :          stream->distance == 0) &&
     514      2224742 :         stream->ios_in_progress < stream->max_ios)
     515      2224742 :         read_stream_start_pending_read(stream);
     516              : 
     517              :     /*
     518              :      * There should always be something pinned when we leave this function,
     519              :      * whether started by this call or not, unless we've hit the end of the
     520              :      * stream.  In the worst case we can always make progress one buffer at a
     521              :      * time.
     522              :      */
     523              :     Assert(stream->pinned_buffers > 0 || stream->distance == 0);
     524              : 
     525      4053914 :     if (stream->batch_mode)
     526      3493171 :         pgaio_exit_batchmode();
     527              : }
     528              : 
     529              : /*
     530              :  * Create a new read stream object that can be used to perform the equivalent
     531              :  * of a series of ReadBuffer() calls for one fork of one relation.
     532              :  * Internally, it generates larger vectored reads where possible by looking
     533              :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     534              :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     535              :  * write extra data for each block into the space provided to it.  It will
     536              :  * also receive callback_private_data for its own purposes.
     537              :  */
     538              : static ReadStream *
     539       893088 : read_stream_begin_impl(int flags,
     540              :                        BufferAccessStrategy strategy,
     541              :                        Relation rel,
     542              :                        SMgrRelation smgr,
     543              :                        char persistence,
     544              :                        ForkNumber forknum,
     545              :                        ReadStreamBlockNumberCB callback,
     546              :                        void *callback_private_data,
     547              :                        size_t per_buffer_data_size)
     548              : {
     549              :     ReadStream *stream;
     550              :     size_t      size;
     551              :     int16       queue_size;
     552              :     int16       queue_overflow;
     553              :     int         max_ios;
     554              :     int         strategy_pin_limit;
     555              :     uint32      max_pinned_buffers;
     556              :     uint32      max_possible_buffer_limit;
     557              :     Oid         tablespace_id;
     558              : 
     559              :     /*
     560              :      * Decide how many I/Os we will allow to run at the same time.  That
     561              :      * currently means advice to the kernel to tell it that we will soon read.
     562              :      * This number also affects how far we look ahead for opportunities to
     563              :      * start more I/Os.
     564              :      */
     565       893088 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     566       893088 :     if (!OidIsValid(MyDatabaseId) ||
     567      1281343 :         (rel && IsCatalogRelation(rel)) ||
     568       462561 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     569              :     {
     570              :         /*
     571              :          * Avoid circularity while trying to look up tablespace settings or
     572              :          * before spccache.c is ready.
     573              :          */
     574       490204 :         max_ios = effective_io_concurrency;
     575              :     }
     576       402884 :     else if (flags & READ_STREAM_MAINTENANCE)
     577        15977 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     578              :     else
     579       386907 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     580              : 
     581              :     /* Cap to INT16_MAX to avoid overflowing below */
     582       893088 :     max_ios = Min(max_ios, PG_INT16_MAX);
     583              : 
     584              :     /*
     585              :      * If starting a multi-block I/O near the end of the queue, we might
     586              :      * temporarily need extra space for overflowing buffers before they are
     587              :      * moved to regular circular position.  This is the maximum extra space we
     588              :      * could need.
     589              :      */
     590       893088 :     queue_overflow = io_combine_limit - 1;
     591              : 
     592              :     /*
     593              :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     594              :      * pin fewer if we can, though.  We add one so that we can make progress
     595              :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
     596              :      * this also allows an extra full I/O's worth of buffers: after an I/O
     597              :      * finishes we don't want to have to wait for its buffers to be consumed
     598              :      * before starting a new one.
     599              :      *
     600              :      * Be careful not to allow int16 to overflow.  That is possible with the
     601              :      * current GUC range limits, so this is an artificial limit of ~32k
     602              :      * buffers and we'd need to adjust the types to exceed that.  We also have
     603              :      * to allow for the spare entry and the overflow space.
     604              :      */
     605       893088 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
     606       893088 :     max_pinned_buffers = Min(max_pinned_buffers,
     607              :                              PG_INT16_MAX - queue_overflow - 1);
     608              : 
     609              :     /* Give the strategy a chance to limit the number of buffers we pin. */
     610       893088 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     611       893088 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     612              : 
     613              :     /*
     614              :      * Also limit our queue to the maximum number of pins we could ever be
     615              :      * allowed to acquire according to the buffer manager.  We may not really
     616              :      * be able to use them all due to other pins held by this backend, but
     617              :      * we'll check that later in read_stream_start_pending_read().
     618              :      */
     619       893088 :     if (SmgrIsTemp(smgr))
     620         6965 :         max_possible_buffer_limit = GetLocalPinLimit();
     621              :     else
     622       886123 :         max_possible_buffer_limit = GetPinLimit();
     623       893088 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
     624              : 
     625              :     /*
     626              :      * The limit might be zero on a system configured with too few buffers for
     627              :      * the number of connections.  We need at least one to make progress.
     628              :      */
     629       893088 :     max_pinned_buffers = Max(1, max_pinned_buffers);
     630              : 
     631              :     /*
     632              :      * We need one extra entry for buffers and per-buffer data, because users
     633              :      * of per-buffer data have access to the object until the next call to
     634              :      * read_stream_next_buffer(), so we need a gap between the head and tail
     635              :      * of the queue so that we don't clobber it.
     636              :      */
     637       893088 :     queue_size = max_pinned_buffers + 1;
     638              : 
     639              :     /*
     640              :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     641              :      * one big chunk.  Though we have queue_size buffers, we want to be able
     642              :      * to assume that all the buffers for a single read are contiguous (i.e.
     643              :      * don't wrap around halfway through), so we allow temporary overflows of
     644              :      * up to the maximum possible overflow size.
     645              :      */
     646       893088 :     size = offsetof(ReadStream, buffers);
     647       893088 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
     648       893088 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     649       893088 :     size += per_buffer_data_size * queue_size;
     650       893088 :     size += MAXIMUM_ALIGNOF * 2;
     651       893088 :     stream = (ReadStream *) palloc(size);
     652       893088 :     memset(stream, 0, offsetof(ReadStream, buffers));
     653       893088 :     stream->ios = (InProgressIO *)
     654       893088 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
     655       893088 :     if (per_buffer_data_size > 0)
     656       124696 :         stream->per_buffer_data = (void *)
     657       124696 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     658              : 
     659       893088 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
     660       893088 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
     661              : 
     662              : #ifdef USE_PREFETCH
     663              : 
     664              :     /*
     665              :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
     666              :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
     667              :      * caller hasn't promised sequential access (overriding our detection
     668              :      * heuristics), and max_ios hasn't been set to zero.
     669              :      */
     670       893088 :     if (stream->sync_mode &&
     671         2886 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
     672         2886 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     673              :         max_ios > 0)
     674          692 :         stream->advice_enabled = true;
     675              : #endif
     676              : 
     677              :     /*
     678              :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
     679              :      * we still need to allocate space to combine and run one I/O.  Bump it up
     680              :      * to one, and remember to ask for synchronous I/O only.
     681              :      */
     682       893088 :     if (max_ios == 0)
     683              :     {
     684            7 :         max_ios = 1;
     685            7 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
     686              :     }
     687              : 
     688              :     /*
     689              :      * Capture stable values for these two GUC-derived numbers for the
     690              :      * lifetime of this stream, so we don't have to worry about the GUCs
     691              :      * changing underneath us beyond this point.
     692              :      */
     693       893088 :     stream->max_ios = max_ios;
     694       893088 :     stream->io_combine_limit = io_combine_limit;
     695              : 
     696       893088 :     stream->per_buffer_data_size = per_buffer_data_size;
     697       893088 :     stream->max_pinned_buffers = max_pinned_buffers;
     698       893088 :     stream->queue_size = queue_size;
     699       893088 :     stream->callback = callback;
     700       893088 :     stream->callback_private_data = callback_private_data;
     701       893088 :     stream->buffered_blocknum = InvalidBlockNumber;
     702       893088 :     stream->seq_blocknum = InvalidBlockNumber;
     703       893088 :     stream->seq_until_processed = InvalidBlockNumber;
     704       893088 :     stream->temporary = SmgrIsTemp(smgr);
     705              : 
     706              :     /*
     707              :      * Skip the initial ramp-up phase if the caller says we're going to be
     708              :      * reading the whole relation.  This way we start out assuming we'll be
     709              :      * doing full io_combine_limit sized reads.
     710              :      */
     711       893088 :     if (flags & READ_STREAM_FULL)
     712        70075 :         stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
     713              :     else
     714       823013 :         stream->distance = 1;
     715       893088 :     stream->resume_distance = stream->distance;
     716              : 
     717              :     /*
     718              :      * Since we always access the same relation, we can initialize parts of
     719              :      * the ReadBuffersOperation objects and leave them that way, to avoid
     720              :      * wasting CPU cycles writing to them for each read.
     721              :      */
     722     15203845 :     for (int i = 0; i < max_ios; ++i)
     723              :     {
     724     14310757 :         stream->ios[i].op.rel = rel;
     725     14310757 :         stream->ios[i].op.smgr = smgr;
     726     14310757 :         stream->ios[i].op.persistence = persistence;
     727     14310757 :         stream->ios[i].op.forknum = forknum;
     728     14310757 :         stream->ios[i].op.strategy = strategy;
     729              :     }
     730              : 
     731       893088 :     return stream;
     732              : }
     733              : 
     734              : /*
     735              :  * Create a new read stream for reading a relation.
     736              :  * See read_stream_begin_impl() for the detailed explanation.
     737              :  */
     738              : ReadStream *
     739       829099 : read_stream_begin_relation(int flags,
     740              :                            BufferAccessStrategy strategy,
     741              :                            Relation rel,
     742              :                            ForkNumber forknum,
     743              :                            ReadStreamBlockNumberCB callback,
     744              :                            void *callback_private_data,
     745              :                            size_t per_buffer_data_size)
     746              : {
     747       829099 :     return read_stream_begin_impl(flags,
     748              :                                   strategy,
     749              :                                   rel,
     750              :                                   RelationGetSmgr(rel),
     751       829099 :                                   rel->rd_rel->relpersistence,
     752              :                                   forknum,
     753              :                                   callback,
     754              :                                   callback_private_data,
     755              :                                   per_buffer_data_size);
     756              : }
     757              : 
     758              : /*
     759              :  * Create a new read stream for reading a SMgr relation.
     760              :  * See read_stream_begin_impl() for the detailed explanation.
     761              :  */
     762              : ReadStream *
     763        63989 : read_stream_begin_smgr_relation(int flags,
     764              :                                 BufferAccessStrategy strategy,
     765              :                                 SMgrRelation smgr,
     766              :                                 char smgr_persistence,
     767              :                                 ForkNumber forknum,
     768              :                                 ReadStreamBlockNumberCB callback,
     769              :                                 void *callback_private_data,
     770              :                                 size_t per_buffer_data_size)
     771              : {
     772        63989 :     return read_stream_begin_impl(flags,
     773              :                                   strategy,
     774              :                                   NULL,
     775              :                                   smgr,
     776              :                                   smgr_persistence,
     777              :                                   forknum,
     778              :                                   callback,
     779              :                                   callback_private_data,
     780              :                                   per_buffer_data_size);
     781              : }
     782              : 
     783              : /*
     784              :  * Pull one pinned buffer out of a stream.  Each call returns successive
     785              :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     786              :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     787              :  * per-buffer data that the callback had a chance to populate, which remains
     788              :  * valid until the next call to read_stream_next_buffer().  When the stream
     789              :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     790              :  * the stream early at any time by calling read_stream_end().
     791              :  */
     792              : Buffer
     793      7470413 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     794              : {
     795              :     Buffer      buffer;
     796              :     int16       oldest_buffer_index;
     797              : 
     798              : #ifndef READ_STREAM_DISABLE_FAST_PATH
     799              : 
     800              :     /*
     801              :      * A fast path for all-cached scans.  This is the same as the usual
     802              :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
     803              :      * we can skip the queue management code, stay in the same buffer slot and
     804              :      * use singular StartReadBuffer().
     805              :      */
     806      7470413 :     if (likely(stream->fast_path))
     807              :     {
     808              :         BlockNumber next_blocknum;
     809              : 
     810              :         /* Fast path assumptions. */
     811              :         Assert(stream->ios_in_progress == 0);
     812              :         Assert(stream->forwarded_buffers == 0);
     813              :         Assert(stream->pinned_buffers == 1);
     814              :         Assert(stream->distance == 1);
     815              :         Assert(stream->pending_read_nblocks == 0);
     816              :         Assert(stream->per_buffer_data_size == 0);
     817              :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
     818              : 
     819              :         /* We're going to return the buffer we pinned last time. */
     820      2209048 :         oldest_buffer_index = stream->oldest_buffer_index;
     821              :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     822              :                stream->next_buffer_index);
     823      2209048 :         buffer = stream->buffers[oldest_buffer_index];
     824              :         Assert(buffer != InvalidBuffer);
     825              : 
     826              :         /* Choose the next block to pin. */
     827      2209048 :         next_blocknum = read_stream_get_block(stream, NULL);
     828              : 
     829      2209048 :         if (likely(next_blocknum != InvalidBlockNumber))
     830              :         {
     831      2117474 :             int         flags = stream->read_buffers_flags;
     832              : 
     833      2117474 :             if (stream->advice_enabled)
     834          524 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     835              : 
     836              :             /*
     837              :              * Pin a buffer for the next call.  Same buffer entry, and
     838              :              * arbitrary I/O entry (they're all free).  We don't have to
     839              :              * adjust pinned_buffers because we're transferring one to caller
     840              :              * but pinning one more.
     841              :              *
     842              :              * In the fast path we don't need to check the pin limit.  We're
     843              :              * always allowed at least one pin so that progress can be made,
     844              :              * and that's all we need here.  Although two pins are momentarily
     845              :              * held at the same time, the model used here is that the stream
     846              :              * holds only one, and the other now belongs to the caller.
     847              :              */
     848      2117474 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
     849              :                                         &stream->buffers[oldest_buffer_index],
     850              :                                         next_blocknum,
     851              :                                         flags)))
     852              :             {
     853              :                 /* Fast return. */
     854      2102734 :                 return buffer;
     855              :             }
     856              : 
     857              :             /* Next call must wait for I/O for the newly pinned buffer. */
     858        14740 :             stream->oldest_io_index = 0;
     859        14740 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     860        14740 :             stream->ios_in_progress = 1;
     861        14740 :             stream->ios[0].buffer_index = oldest_buffer_index;
     862        14740 :             stream->seq_blocknum = next_blocknum + 1;
     863              :         }
     864              :         else
     865              :         {
     866              :             /* No more blocks, end of stream. */
     867        91574 :             stream->distance = 0;
     868        91574 :             stream->oldest_buffer_index = stream->next_buffer_index;
     869        91574 :             stream->pinned_buffers = 0;
     870        91574 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
     871              :         }
     872              : 
     873       106314 :         stream->fast_path = false;
     874       106314 :         return buffer;
     875              :     }
     876              : #endif
     877              : 
     878      5261365 :     if (unlikely(stream->pinned_buffers == 0))
     879              :     {
     880              :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     881              : 
     882              :         /* End of stream reached?  */
     883      3885820 :         if (stream->distance == 0)
     884      2141077 :             return InvalidBuffer;
     885              : 
     886              :         /*
     887              :          * The usual order of operations is that we look ahead at the bottom
     888              :          * of this function after potentially finishing an I/O and making
     889              :          * space for more, but if we're just starting up we'll need to crank
     890              :          * the handle to get started.
     891              :          */
     892      1744743 :         read_stream_look_ahead(stream);
     893              : 
     894              :         /* End of stream reached? */
     895      1744743 :         if (stream->pinned_buffers == 0)
     896              :         {
     897              :             Assert(stream->distance == 0);
     898       811085 :             return InvalidBuffer;
     899              :         }
     900              :     }
     901              : 
     902              :     /* Grab the oldest pinned buffer and associated per-buffer data. */
     903              :     Assert(stream->pinned_buffers > 0);
     904      2309203 :     oldest_buffer_index = stream->oldest_buffer_index;
     905              :     Assert(oldest_buffer_index >= 0 &&
     906              :            oldest_buffer_index < stream->queue_size);
     907      2309203 :     buffer = stream->buffers[oldest_buffer_index];
     908      2309203 :     if (per_buffer_data)
     909       662852 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     910              : 
     911              :     Assert(BufferIsValid(buffer));
     912              : 
     913              :     /* Do we have to wait for an associated I/O first? */
     914      2309203 :     if (stream->ios_in_progress > 0 &&
     915       717758 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     916              :     {
     917       627194 :         int16       io_index = stream->oldest_io_index;
     918              :         int32       distance;   /* wider temporary value, clamped below */
     919              : 
     920              :         /* Sanity check that we still agree on the buffers. */
     921              :         Assert(stream->ios[io_index].op.buffers ==
     922              :                &stream->buffers[oldest_buffer_index]);
     923              : 
     924       627194 :         WaitReadBuffers(&stream->ios[io_index].op);
     925              : 
     926              :         Assert(stream->ios_in_progress > 0);
     927       627172 :         stream->ios_in_progress--;
     928       627172 :         if (++stream->oldest_io_index == stream->max_ios)
     929        25411 :             stream->oldest_io_index = 0;
     930              : 
     931              :         /* Look-ahead distance ramps up rapidly after we do I/O. */
     932       627172 :         distance = stream->distance * 2;
     933       627172 :         distance = Min(distance, stream->max_pinned_buffers);
     934       627172 :         stream->distance = distance;
     935              : 
     936              :         /*
     937              :          * If we've reached the first block of a sequential region we're
     938              :          * issuing advice for, cancel that until the next jump.  The kernel
     939              :          * will see the sequential preadv() pattern starting here.
     940              :          */
     941       627172 :         if (stream->advice_enabled &&
     942          273 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
     943          252 :             stream->seq_until_processed = InvalidBlockNumber;
     944              :     }
     945              : 
     946              :     /*
     947              :      * We must zap this queue entry, or else it would appear as a forwarded
     948              :      * buffer.  If it's potentially in the overflow zone (ie from a
     949              :      * multi-block I/O that wrapped around the queue), also zap the copy.
     950              :      */
     951      2309181 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
     952      2309181 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
     953      1791329 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
     954              :             InvalidBuffer;
     955              : 
     956              : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
     957              : 
     958              :     /*
     959              :      * The caller will get access to the per-buffer data, until the next call.
     960              :      * We wipe the one before, which is never occupied because queue_size
     961              :      * allowed one extra element.  This will hopefully trip up client code
     962              :      * that is holding a dangling pointer to it.
     963              :      */
     964              :     if (stream->per_buffer_data)
     965              :     {
     966              :         void       *per_buffer_data;
     967              : 
     968              :         per_buffer_data = get_per_buffer_data(stream,
     969              :                                               oldest_buffer_index == 0 ?
     970              :                                               stream->queue_size - 1 :
     971              :                                               oldest_buffer_index - 1);
     972              : 
     973              : #if defined(CLOBBER_FREED_MEMORY)
     974              :         /* This also tells Valgrind the memory is "noaccess". */
     975              :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
     976              : #elif defined(USE_VALGRIND)
     977              :         /* Tell it ourselves. */
     978              :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
     979              :                                    stream->per_buffer_data_size);
     980              : #endif
     981              :     }
     982              : #endif
     983              : 
     984              :     /* Pin transferred to caller. */
     985              :     Assert(stream->pinned_buffers > 0);
     986      2309181 :     stream->pinned_buffers--;
     987              : 
     988              :     /* Advance oldest buffer, with wrap-around. */
     989      2309181 :     stream->oldest_buffer_index++;
     990      2309181 :     if (stream->oldest_buffer_index == stream->queue_size)
     991       305057 :         stream->oldest_buffer_index = 0;
     992              : 
     993              :     /* Prepare for the next call. */
     994      2309181 :     read_stream_look_ahead(stream);
     995              : 
     996              : #ifndef READ_STREAM_DISABLE_FAST_PATH
     997              :     /* See if we can take the fast path for all-cached scans next time. */
     998      2309175 :     if (stream->ios_in_progress == 0 &&
     999      1682159 :         stream->forwarded_buffers == 0 &&
    1000      1681791 :         stream->pinned_buffers == 1 &&
    1001       780344 :         stream->distance == 1 &&
    1002       708012 :         stream->pending_read_nblocks == 0 &&
    1003       706585 :         stream->per_buffer_data_size == 0)
    1004              :     {
    1005              :         /*
    1006              :          * The fast path spins on one buffer entry repeatedly instead of
    1007              :          * rotating through the whole queue and clearing the entries behind
    1008              :          * it.  If the buffer it starts with happened to be forwarded between
    1009              :          * StartReadBuffers() calls and also wrapped around the circular queue
    1010              :          * partway through, then a copy also exists in the overflow zone, and
    1011              :          * it won't clear it out as the regular path would.  Do that now, so
    1012              :          * it doesn't need code for that.
    1013              :          */
    1014       203097 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
    1015       201734 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
    1016              :                 InvalidBuffer;
    1017              : 
    1018       203097 :         stream->fast_path = true;
    1019              :     }
    1020              : #endif
    1021              : 
    1022      2309175 :     return buffer;
    1023              : }
    1024              : 
    1025              : /*
    1026              :  * Transitional support for code that would like to perform or skip reads
    1027              :  * itself, without using the stream.  Returns, and consumes, the next block
    1028              :  * number that would be read by the stream's look-ahead algorithm, or
    1029              :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
    1030              :  * strategy that would be used to read it.
    1031              :  */
    1032              : BlockNumber
    1033            0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
    1034              : {
    1035            0 :     *strategy = stream->ios[0].op.strategy;
    1036            0 :     return read_stream_get_block(stream, NULL);
    1037              : }
    1038              : 
    1039              : /*
    1040              :  * Temporarily stop consuming block numbers from the block number callback.
    1041              :  * If called inside the block number callback, its return value should be
    1042              :  * returned by the callback.
    1043              :  */
    1044              : BlockNumber
    1045            0 : read_stream_pause(ReadStream *stream)
    1046              : {
    1047            0 :     stream->resume_distance = stream->distance;
    1048            0 :     stream->distance = 0;
    1049            0 :     return InvalidBlockNumber;
    1050              : }
    1051              : 
    1052              : /*
    1053              :  * Resume looking ahead after the block number callback reported
    1054              :  * end-of-stream. This is useful for streams of self-referential blocks, after
    1055              :  * a buffer needed to be consumed and examined to find more block numbers.
    1056              :  */
    1057              : void
    1058            0 : read_stream_resume(ReadStream *stream)
    1059              : {
    1060            0 :     stream->distance = stream->resume_distance;
    1061            0 : }
    1062              : 
    1063              : /*
    1064              :  * Reset a read stream by releasing any queued up buffers, allowing the stream
    1065              :  * to be used again for different blocks.  This can be used to clear an
    1066              :  * end-of-stream condition and start again, or to throw away blocks that were
    1067              :  * speculatively read and read some different blocks instead.
    1068              :  */
    1069              : void
    1070      1745280 : read_stream_reset(ReadStream *stream)
    1071              : {
    1072              :     int16       index;
    1073              :     Buffer      buffer;
    1074              : 
    1075              :     /* Stop looking ahead. */
    1076      1745280 :     stream->distance = 0;
    1077              : 
    1078              :     /* Forget buffered block number and fast path state. */
    1079      1745280 :     stream->buffered_blocknum = InvalidBlockNumber;
    1080      1745280 :     stream->fast_path = false;
    1081              : 
    1082              :     /* Unpin anything that wasn't consumed. */
    1083      1871624 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    1084       126344 :         ReleaseBuffer(buffer);
    1085              : 
    1086              :     /* Unpin any unused forwarded buffers. */
    1087      1745280 :     index = stream->next_buffer_index;
    1088      1745280 :     while (index < stream->initialized_buffers &&
    1089       213511 :            (buffer = stream->buffers[index]) != InvalidBuffer)
    1090              :     {
    1091              :         Assert(stream->forwarded_buffers > 0);
    1092            0 :         stream->forwarded_buffers--;
    1093            0 :         ReleaseBuffer(buffer);
    1094              : 
    1095            0 :         stream->buffers[index] = InvalidBuffer;
    1096            0 :         if (index < stream->io_combine_limit - 1)
    1097            0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
    1098              : 
    1099            0 :         if (++index == stream->queue_size)
    1100            0 :             index = 0;
    1101              :     }
    1102              : 
    1103              :     Assert(stream->forwarded_buffers == 0);
    1104              :     Assert(stream->pinned_buffers == 0);
    1105              :     Assert(stream->ios_in_progress == 0);
    1106              : 
    1107              :     /* Start off assuming data is cached. */
    1108      1745280 :     stream->distance = 1;
    1109      1745280 :     stream->resume_distance = stream->distance;
    1110      1745280 : }
    1111              : 
    1112              : /*
    1113              :  * Release and free a read stream.
    1114              :  */
    1115              : void
    1116       890684 : read_stream_end(ReadStream *stream)
    1117              : {
    1118       890684 :     read_stream_reset(stream);
    1119       890684 :     pfree(stream);
    1120       890684 : }
        

Generated by: LCOV version 2.0-1