LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 90.5 % 326 295
Test Date: 2026-04-07 14:16:30 Functions: 76.5 % 17 13
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * read_stream.c
       4              :  *    Mechanism for accessing buffered relation data with look-ahead
       5              :  *
       6              :  * Code that needs to access relation data typically pins blocks one at a
       7              :  * time, often in a predictable order that might be sequential or data-driven.
       8              :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9              :  * because blocks that are not yet in the buffer pool require I/O operations
      10              :  * that are small and might stall waiting for storage.  This mechanism looks
      11              :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12              :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13              :  * distance.
      14              :  *
      15              :  * A user-provided callback generates a stream of block numbers that is used
      16              :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17              :  * pending read.  When that isn't possible, the existing pending read is sent
      18              :  * to StartReadBuffers() so that a new one can begin to form.
      19              :  *
      20              :  * The algorithm for controlling the look-ahead distance is based on recent
      21              :  * cache / miss history, as well as whether we need to wait for I/O completion
      22              :  * after a miss.  When no I/O is necessary, there is no benefit in looking
      23              :  * ahead more than one block.  This is the default initial assumption.  When
      24              :  * blocks needing I/O are streamed, the combine distance is increased to
      25              :  * benefit from I/O combining and the read-ahead distance is increased
      26              :  * whenever we need to wait for I/O to try to benefit from increased I/O
      27              :  * concurrency. Both are reduced gradually when cached blocks are streamed.
      28              :  *
      29              :  * The main data structure is a circular queue of buffers of size
      30              :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      31              :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      32              :  * variable sized object that is passed from the callback to the consumer of
      33              :  * buffers.
      34              :  *
      35              :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      36              :  * I/Os that have been started with StartReadBuffers(), and for which
      37              :  * WaitReadBuffers() must be called before returning the buffer.
      38              :  *
      39              :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      40              :  * successive calls, then these data structures might appear as follows:
      41              :  *
      42              :  *                          buffers buf/data       ios
      43              :  *
      44              :  *                          +----+  +-----+       +--------+
      45              :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      46              :  *                          +----+  +-----+  |    +--------+
      47              :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      48              :  *                          +----+  +-----+  | |  +--------+
      49              :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      50              :  *                          +----+  +-----+    |  +--------+
      51              :  *                          | 43 |  |  ?  |    |  |        |
      52              :  *                          +----+  +-----+    |  +--------+
      53              :  *                          | 44 |  |  ?  |    |  |        |
      54              :  *                          +----+  +-----+    |  +--------+
      55              :  *                          | 60 |  |  ?  |<---+
      56              :  *                          +----+  +-----+
      57              :  *     next_buffer_index -> |    |  |     |
      58              :  *                          +----+  +-----+
      59              :  *
      60              :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      61              :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      62              :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      63              :  * does block 60.
      64              :  *
      65              :  *
      66              :  * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
      67              :  * Portions Copyright (c) 1994, Regents of the University of California
      68              :  *
      69              :  * IDENTIFICATION
      70              :  *    src/backend/storage/aio/read_stream.c
      71              :  *
      72              :  *-------------------------------------------------------------------------
      73              :  */
      74              : #include "postgres.h"
      75              : 
      76              : #include "miscadmin.h"
      77              : #include "storage/aio.h"
      78              : #include "storage/fd.h"
      79              : #include "storage/smgr.h"
      80              : #include "storage/read_stream.h"
      81              : #include "utils/memdebug.h"
      82              : #include "utils/rel.h"
      83              : #include "utils/spccache.h"
      84              : 
      85              : typedef struct InProgressIO
      86              : {
      87              :     int16       buffer_index;
      88              :     ReadBuffersOperation op;
      89              : } InProgressIO;
      90              : 
      91              : /*
      92              :  * State for managing a stream of reads.
      93              :  */
      94              : struct ReadStream
      95              : {
      96              :     int16       max_ios;
      97              :     int16       io_combine_limit;
      98              :     int16       ios_in_progress;
      99              :     int16       queue_size;
     100              :     int16       max_pinned_buffers;
     101              :     int16       forwarded_buffers;
     102              :     int16       pinned_buffers;
     103              : 
     104              :     /*
     105              :      * Limit of how far, in blocks, to look-ahead for IO combining and for
     106              :      * read-ahead.
     107              :      *
     108              :      * The limits for read-ahead and combining are handled separately to allow
     109              :      * for IO combining even in cases where the I/O subsystem can keep up at a
     110              :      * low read-ahead distance, as doing larger IOs is more efficient.
     111              :      *
     112              :      * Set to 0 when the end of the stream is reached.
     113              :      */
     114              :     int16       combine_distance;
     115              :     int16       readahead_distance;
     116              :     uint16      distance_decay_holdoff;
     117              :     int16       initialized_buffers;
     118              :     int16       resume_readahead_distance;
     119              :     int16       resume_combine_distance;
     120              :     int         read_buffers_flags;
     121              :     bool        sync_mode;      /* using io_method=sync */
     122              :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
     123              :     bool        advice_enabled;
     124              :     bool        temporary;
     125              : 
     126              :     /*
     127              :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     128              :      * control problems when I/Os are split.
     129              :      */
     130              :     BlockNumber buffered_blocknum;
     131              : 
     132              :     /*
     133              :      * The callback that will tell us which block numbers to read, and an
     134              :      * opaque pointer that will be pass to it for its own purposes.
     135              :      */
     136              :     ReadStreamBlockNumberCB callback;
     137              :     void       *callback_private_data;
     138              : 
     139              :     /* Next expected block, for detecting sequential access. */
     140              :     BlockNumber seq_blocknum;
     141              :     BlockNumber seq_until_processed;
     142              : 
     143              :     /* The read operation we are currently preparing. */
     144              :     BlockNumber pending_read_blocknum;
     145              :     int16       pending_read_nblocks;
     146              : 
     147              :     /* Space for buffers and optional per-buffer private data. */
     148              :     size_t      per_buffer_data_size;
     149              :     void       *per_buffer_data;
     150              : 
     151              :     /* Read operations that have been started but not waited for yet. */
     152              :     InProgressIO *ios;
     153              :     int16       oldest_io_index;
     154              :     int16       next_io_index;
     155              : 
     156              :     bool        fast_path;
     157              : 
     158              :     /* Circular queue of buffers. */
     159              :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     160              :     int16       next_buffer_index;  /* Index of next buffer to pin */
     161              :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     162              : };
     163              : 
     164              : /*
     165              :  * Return a pointer to the per-buffer data by index.
     166              :  */
     167              : static inline void *
     168      4286408 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     169              : {
     170      8572816 :     return (char *) stream->per_buffer_data +
     171      4286408 :         stream->per_buffer_data_size * buffer_index;
     172              : }
     173              : 
     174              : /*
     175              :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     176              :  * blocks [current_blocknum, last_exclusive).
     177              :  */
     178              : BlockNumber
     179       423563 : block_range_read_stream_cb(ReadStream *stream,
     180              :                            void *callback_private_data,
     181              :                            void *per_buffer_data)
     182              : {
     183       423563 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     184              : 
     185       423563 :     if (p->current_blocknum < p->last_exclusive)
     186       345991 :         return p->current_blocknum++;
     187              : 
     188        77572 :     return InvalidBlockNumber;
     189              : }
     190              : 
     191              : /*
     192              :  * Ask the callback which block it would like us to read next, with a one block
     193              :  * buffer in front to allow read_stream_unget_block() to work.
     194              :  */
     195              : static inline BlockNumber
     196      6557922 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     197              : {
     198              :     BlockNumber blocknum;
     199              : 
     200      6557922 :     blocknum = stream->buffered_blocknum;
     201      6557922 :     if (blocknum != InvalidBlockNumber)
     202            0 :         stream->buffered_blocknum = InvalidBlockNumber;
     203              :     else
     204              :     {
     205              :         /*
     206              :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
     207              :          * the "noaccess" state that was set when the consumer moved past this
     208              :          * entry last time around the queue, and should also catch callbacks
     209              :          * that fail to initialize data that the buffer consumer later
     210              :          * accesses.  On the first go around, it is undefined already.
     211              :          */
     212              :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     213              :                                     stream->per_buffer_data_size);
     214      6557922 :         blocknum = stream->callback(stream,
     215              :                                     stream->callback_private_data,
     216              :                                     per_buffer_data);
     217              :     }
     218              : 
     219      6557922 :     return blocknum;
     220              : }
     221              : 
     222              : /*
     223              :  * In order to deal with buffer shortages and I/O limits after short reads, we
     224              :  * sometimes need to defer handling of a block we've already consumed from the
     225              :  * registered callback until later.
     226              :  */
     227              : static inline void
     228            0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     229              : {
     230              :     /* We shouldn't ever unget more than one block. */
     231              :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     232              :     Assert(blocknum != InvalidBlockNumber);
     233            0 :     stream->buffered_blocknum = blocknum;
     234            0 : }
     235              : 
     236              : /*
     237              :  * Start as much of the current pending read as we can.  If we have to split it
     238              :  * because of the per-backend buffer limit, or the buffer manager decides to
     239              :  * split it, then the pending read is adjusted to hold the remaining portion.
     240              :  *
     241              :  * We can always start a read of at least size one if we have no progress yet.
     242              :  * Otherwise it's possible that we can't start a read at all because of a lack
     243              :  * of buffers, and then false is returned.  Buffer shortages also reduce the
     244              :  * distance to a level that prevents look-ahead until buffers are released.
     245              :  */
     246              : static bool
     247      2109388 : read_stream_start_pending_read(ReadStream *stream)
     248              : {
     249              :     bool        need_wait;
     250              :     int         requested_nblocks;
     251              :     int         nblocks;
     252              :     int         flags;
     253              :     int         forwarded;
     254              :     int16       io_index;
     255              :     int16       overflow;
     256              :     int16       buffer_index;
     257              :     int         buffer_limit;
     258              : 
     259              :     /* This should only be called with a pending read. */
     260              :     Assert(stream->pending_read_nblocks > 0);
     261              :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
     262              : 
     263              :     /* We had better not exceed the per-stream buffer limit with this read. */
     264              :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     265              :            stream->max_pinned_buffers);
     266              : 
     267              : #ifdef USE_ASSERT_CHECKING
     268              :     /* We had better not be overwriting an existing pinned buffer. */
     269              :     if (stream->pinned_buffers > 0)
     270              :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     271              :     else
     272              :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     273              : 
     274              :     /*
     275              :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
     276              :      * had to split the operation should match the leading blocks of this
     277              :      * following StartReadBuffers() call.
     278              :      */
     279              :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     280              :     for (int i = 0; i < stream->forwarded_buffers; ++i)
     281              :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
     282              :                stream->pending_read_blocknum + i);
     283              : 
     284              :     /*
     285              :      * Check that we've cleared the queue/overflow entries corresponding to
     286              :      * the rest of the blocks covered by this read, unless it's the first go
     287              :      * around and we haven't even initialized them yet.
     288              :      */
     289              :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
     290              :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
     291              :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
     292              : #endif
     293              : 
     294              :     /* Do we need to issue read-ahead advice? */
     295      2109388 :     flags = stream->read_buffers_flags;
     296      2109388 :     if (stream->advice_enabled)
     297              :     {
     298         1711 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
     299              :         {
     300              :             /*
     301              :              * Sequential:  Issue advice until the preadv() calls have caught
     302              :              * up with the first advice issued for this sequential region, and
     303              :              * then stay out of the way of the kernel's own read-ahead.
     304              :              */
     305           29 :             if (stream->seq_until_processed != InvalidBlockNumber)
     306            1 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     307              :         }
     308              :         else
     309              :         {
     310              :             /*
     311              :              * Random jump:  Note the starting location of a new potential
     312              :              * sequential region and start issuing advice.  Skip it this time
     313              :              * if the preadv() follows immediately, eg first block in stream.
     314              :              */
     315         1682 :             stream->seq_until_processed = stream->pending_read_blocknum;
     316         1682 :             if (stream->pinned_buffers > 0)
     317           45 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     318              :         }
     319              :     }
     320              : 
     321              :     /*
     322              :      * How many more buffers is this backend allowed?
     323              :      *
     324              :      * Forwarded buffers are already pinned and map to the leading blocks of
     325              :      * the pending read (the remaining portion of an earlier short read that
     326              :      * we're about to continue).  They are not counted in pinned_buffers, but
     327              :      * they are counted as pins already held by this backend according to the
     328              :      * buffer manager, so they must be added to the limit it grants us.
     329              :      */
     330      2109388 :     if (stream->temporary)
     331        16377 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
     332              :     else
     333      2093011 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
     334              :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     335              : 
     336      2109388 :     buffer_limit += stream->forwarded_buffers;
     337      2109388 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
     338              : 
     339      2109388 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
     340       740858 :         buffer_limit = 1;       /* guarantee progress */
     341              : 
     342              :     /* Does the per-backend limit affect this read? */
     343      2109388 :     nblocks = stream->pending_read_nblocks;
     344      2109388 :     if (buffer_limit < nblocks)
     345              :     {
     346              :         int16       new_distance;
     347              : 
     348              :         /* Shrink distance: no more look-ahead until buffers are released. */
     349         1923 :         new_distance = stream->pinned_buffers + buffer_limit;
     350         1923 :         if (stream->readahead_distance > new_distance)
     351          335 :             stream->readahead_distance = new_distance;
     352              : 
     353              :         /* Unless we have nothing to give the consumer, stop here. */
     354         1923 :         if (stream->pinned_buffers > 0)
     355           55 :             return false;
     356              : 
     357              :         /* A short read is required to make progress. */
     358         1868 :         nblocks = buffer_limit;
     359              :     }
     360              : 
     361              :     /*
     362              :      * We say how many blocks we want to read, but it may be smaller on return
     363              :      * if the buffer manager decides to shorten the read.  Initialize buffers
     364              :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
     365              :      * and keep the original nblocks number so we can check for forwarded
     366              :      * buffers as output, below.
     367              :      */
     368      2109333 :     buffer_index = stream->next_buffer_index;
     369      2109333 :     io_index = stream->next_io_index;
     370      3444010 :     while (stream->initialized_buffers < buffer_index + nblocks)
     371      1334677 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
     372      2109333 :     requested_nblocks = nblocks;
     373      2109333 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     374      2109333 :                                  &stream->buffers[buffer_index],
     375              :                                  stream->pending_read_blocknum,
     376              :                                  &nblocks,
     377              :                                  flags);
     378      2109325 :     stream->pinned_buffers += nblocks;
     379              : 
     380              :     /* Remember whether we need to wait before returning this buffer. */
     381      2109325 :     if (!need_wait)
     382              :     {
     383              :         /*
     384              :          * If there currently is no IO in progress, and we have not needed to
     385              :          * issue IO recently, decay the look-ahead distance.  We detect if we
     386              :          * had to issue IO recently by having a decay holdoff that's set to
     387              :          * the max look-ahead distance whenever we need to do IO.  This is
     388              :          * important to ensure we eventually reach a high enough distance to
     389              :          * perform IO asynchronously when starting out with a small look-ahead
     390              :          * distance.
     391              :          */
     392      1420625 :         if (stream->ios_in_progress == 0)
     393              :         {
     394      1420153 :             if (stream->distance_decay_holdoff > 0)
     395        27080 :                 stream->distance_decay_holdoff--;
     396              :             else
     397              :             {
     398      1393073 :                 if (stream->readahead_distance > 1)
     399        17337 :                     stream->readahead_distance--;
     400              : 
     401              :                 /*
     402              :                  * For now we reduce the IO combine distance after
     403              :                  * sufficiently many buffer hits. There is no clear
     404              :                  * performance argument for doing so, but at the moment we
     405              :                  * need to do so to make the entrance into fast_path work
     406              :                  * correctly: We require combine_distance == 1 to enter
     407              :                  * fast-path, as without that condition we would wrongly
     408              :                  * re-enter fast-path when readahead_distance == 1 and
     409              :                  * pinned_buffers == 1, as we would not yet have prepared
     410              :                  * another IO in that situation.
     411              :                  */
     412      1393073 :                 if (stream->combine_distance > 1)
     413        17341 :                     stream->combine_distance--;
     414              :             }
     415              :         }
     416              :     }
     417              :     else
     418              :     {
     419              :         /*
     420              :          * Remember to call WaitReadBuffers() before returning head buffer.
     421              :          * Look-ahead distance will be adjusted after waiting.
     422              :          */
     423       688700 :         stream->ios[io_index].buffer_index = buffer_index;
     424       688700 :         if (++stream->next_io_index == stream->max_ios)
     425        29203 :             stream->next_io_index = 0;
     426              :         Assert(stream->ios_in_progress < stream->max_ios);
     427       688700 :         stream->ios_in_progress++;
     428       688700 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     429              :     }
     430              : 
     431              :     /*
     432              :      * How many pins were acquired but forwarded to the next call?  These need
     433              :      * to be passed to the next StartReadBuffers() call by leaving them
     434              :      * exactly where they are in the queue, or released if the stream ends
     435              :      * early.  We need the number for accounting purposes, since they are not
     436              :      * counted in stream->pinned_buffers but we already hold them.
     437              :      */
     438      2109325 :     forwarded = 0;
     439      2111566 :     while (nblocks + forwarded < requested_nblocks &&
     440        66687 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
     441         2241 :         forwarded++;
     442      2109325 :     stream->forwarded_buffers = forwarded;
     443              : 
     444              :     /*
     445              :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     446              :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
     447              :      * the front of the array where they'll be consumed, but also leave a copy
     448              :      * in the overflow zone which the I/O operation has a pointer to (it needs
     449              :      * a contiguous array).  Both copies will be cleared when the buffers are
     450              :      * handed to the consumer.
     451              :      */
     452      2109325 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
     453      2109325 :     if (overflow > 0)
     454              :     {
     455              :         Assert(overflow < stream->queue_size);    /* can't overlap */
     456          374 :         memcpy(&stream->buffers[0],
     457          374 :                &stream->buffers[stream->queue_size],
     458              :                sizeof(stream->buffers[0]) * overflow);
     459              :     }
     460              : 
     461              :     /* Compute location of start of next read, without using % operator. */
     462      2109325 :     buffer_index += nblocks;
     463      2109325 :     if (buffer_index >= stream->queue_size)
     464       347621 :         buffer_index -= stream->queue_size;
     465              :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     466      2109325 :     stream->next_buffer_index = buffer_index;
     467              : 
     468              :     /* Adjust the pending read to cover the remaining portion, if any. */
     469      2109325 :     stream->pending_read_blocknum += nblocks;
     470      2109325 :     stream->pending_read_nblocks -= nblocks;
     471              : 
     472      2109325 :     return true;
     473              : }
     474              : 
     475              : /*
     476              :  * Should we continue to perform look ahead?  Looking ahead may allow us to
     477              :  * make the pending IO larger via IO combining or to issue more read-ahead.
     478              :  */
     479              : static inline bool
     480      6276195 : read_stream_should_look_ahead(ReadStream *stream)
     481              : {
     482              :     /* If the callback has signaled end-of-stream, we're done */
     483      6276195 :     if (stream->readahead_distance == 0)
     484       338058 :         return false;
     485              : 
     486              :     /* never start more IOs than our cap */
     487      5938137 :     if (stream->ios_in_progress >= stream->max_ios)
     488            0 :         return false;
     489              : 
     490              :     /*
     491              :      * Allow looking further ahead if we are in the process of building a
     492              :      * larger IO, the IO is not yet big enough, and we don't yet have IO in
     493              :      * flight.
     494              :      *
     495              :      * We do so to allow building larger reads when readahead_distance is
     496              :      * small (e.g. because the I/O subsystem is keeping up or
     497              :      * effective_io_concurrency is small). That's a useful goal because larger
     498              :      * reads are more CPU efficient than smaller reads, even if the system is
     499              :      * not IO bound.
     500              :      *
     501              :      * The reason we do *not* do so when we already have a read prepared (i.e.
     502              :      * why we check for pinned_buffers == 0) is once we are actually reading
     503              :      * ahead, we don't need it:
     504              :      *
     505              :      * - We won't issue unnecessarily small reads as
     506              :      * read_stream_should_issue_now() will return false until the IO is
     507              :      * suitably sized. The issuance of the pending read will be delayed until
     508              :      * enough buffers have been consumed.
     509              :      *
     510              :      * - If we are not reading ahead aggressively enough, future
     511              :      * WaitReadBuffers() calls will return true, leading to readahead_distance
     512              :      * being increased. After that more full-sized IOs can be issued.
     513              :      *
     514              :      * Furthermore, if we did not have the pinned_buffers == 0 condition, we
     515              :      * might end up issuing I/O more aggressively than we need.
     516              :      *
     517              :      * Note that a return of true here can lead to exceeding the read-ahead
     518              :      * limit, but we won't exceed the buffer pin limit (because pinned_buffers
     519              :      * == 0 and combine_distance is capped by max_pinned_buffers).
     520              :      */
     521      5938137 :     if (stream->pending_read_nblocks > 0 &&
     522      2615557 :         stream->pinned_buffers == 0 &&
     523      2473618 :         stream->pending_read_nblocks < stream->combine_distance)
     524       473970 :         return true;
     525              : 
     526              :     /*
     527              :      * Don't start more read-ahead if that'd put us over the distance limit
     528              :      * for doing read-ahead. As stream->readahead_distance is capped by
     529              :      * max_pinned_buffers, this prevents us from looking ahead so far that it
     530              :      * would put us over the pin limit.
     531              :      */
     532      5464167 :     if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
     533      2077822 :         return false;
     534              : 
     535      3386345 :     return true;
     536              : }
     537              : 
     538              : /*
     539              :  * We don't start the pending read just because we've hit the distance limit,
     540              :  * preferring to give it another chance to grow to full io_combine_limit size
     541              :  * once more buffers have been consumed.  But this is not desirable in all
     542              :  * situations - see below.
     543              :  */
     544              : static inline bool
     545      7363553 : read_stream_should_issue_now(ReadStream *stream)
     546              : {
     547      7363553 :     int16       pending_read_nblocks = stream->pending_read_nblocks;
     548              : 
     549              :     /* there is no pending IO that could be issued */
     550      7363553 :     if (pending_read_nblocks == 0)
     551      4880577 :         return false;
     552              : 
     553              :     /* never start more IOs than our cap */
     554      2482976 :     if (stream->ios_in_progress >= stream->max_ios)
     555            0 :         return false;
     556              : 
     557              :     /*
     558              :      * If the callback has signaled end-of-stream, start the pending read
     559              :      * immediately. There is no further potential for IO combining.
     560              :      */
     561      2482976 :     if (stream->readahead_distance == 0)
     562       104404 :         return true;
     563              : 
     564              :     /*
     565              :      * If we've already reached combine_distance, there's no chance of growing
     566              :      * the read further.
     567              :      */
     568      2378572 :     if (pending_read_nblocks >= stream->combine_distance)
     569      2004935 :         return true;
     570              : 
     571              :     /*
     572              :      * If we currently have no reads in flight or prepared, issue the IO once
     573              :      * we are not looking ahead further. This ensures there's always at least
     574              :      * one IO prepared.
     575              :      */
     576       373637 :     if (stream->pinned_buffers == 0 &&
     577       236985 :         !read_stream_should_look_ahead(stream))
     578            0 :         return true;
     579              : 
     580       373637 :     return false;
     581              : }
     582              : 
     583              : static void
     584      3740223 : read_stream_look_ahead(ReadStream *stream)
     585              : {
     586              :     /*
     587              :      * Allow amortizing the cost of submitting IO over multiple IOs. This
     588              :      * requires that we don't do any operations that could lead to a deadlock
     589              :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
     590              :      * careful.
     591              :      */
     592      3740223 :     if (stream->batch_mode)
     593      3219956 :         pgaio_enter_batchmode();
     594              : 
     595      6039210 :     while (read_stream_should_look_ahead(stream))
     596              :     {
     597              :         BlockNumber blocknum;
     598              :         int16       buffer_index;
     599              :         void       *per_buffer_data;
     600              : 
     601      3623330 :         if (read_stream_should_issue_now(stream))
     602              :         {
     603         2019 :             read_stream_start_pending_read(stream);
     604         2019 :             continue;
     605              :         }
     606              : 
     607              :         /*
     608              :          * See which block the callback wants next in the stream.  We need to
     609              :          * compute the index of the Nth block of the pending read including
     610              :          * wrap-around, but we don't want to use the expensive % operator.
     611              :          */
     612      3621311 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     613      3621311 :         if (buffer_index >= stream->queue_size)
     614         2398 :             buffer_index -= stream->queue_size;
     615              :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     616      3621311 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     617      3621311 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     618      3621311 :         if (blocknum == InvalidBlockNumber)
     619              :         {
     620              :             /* End of stream. */
     621      1324343 :             stream->readahead_distance = 0;
     622      1324343 :             stream->combine_distance = 0;
     623      1324343 :             break;
     624              :         }
     625              : 
     626              :         /* Can we merge it with the pending read? */
     627      2296968 :         if (stream->pending_read_nblocks > 0 &&
     628       254419 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     629              :         {
     630       254376 :             stream->pending_read_nblocks++;
     631       254376 :             continue;
     632              :         }
     633              : 
     634              :         /* We have to start the pending read before we can build another. */
     635      2042641 :         while (stream->pending_read_nblocks > 0)
     636              :         {
     637           49 :             if (!read_stream_start_pending_read(stream) ||
     638           49 :                 stream->ios_in_progress == stream->max_ios)
     639              :             {
     640              :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
     641            0 :                 read_stream_unget_block(stream, blocknum);
     642            0 :                 if (stream->batch_mode)
     643            0 :                     pgaio_exit_batchmode();
     644            0 :                 return;
     645              :             }
     646              :         }
     647              : 
     648              :         /* This is the start of a new pending read. */
     649      2042592 :         stream->pending_read_blocknum = blocknum;
     650      2042592 :         stream->pending_read_nblocks = 1;
     651              :     }
     652              : 
     653              :     /*
     654              :      * Check if the pending read should be issued now, or if we should give it
     655              :      * another chance to grow to the full size.
     656              :      *
     657              :      * Note that the pending read can exceed the distance goal, if the latter
     658              :      * was reduced after hitting the per-backend buffer limit.
     659              :      */
     660      3740223 :     if (read_stream_should_issue_now(stream))
     661      2107320 :         read_stream_start_pending_read(stream);
     662              : 
     663              :     /*
     664              :      * There should always be something pinned when we leave this function,
     665              :      * whether started by this call or not, unless we've hit the end of the
     666              :      * stream.  In the worst case we can always make progress one buffer at a
     667              :      * time.
     668              :      */
     669              :     Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
     670              : 
     671      3740215 :     if (stream->batch_mode)
     672      3219948 :         pgaio_exit_batchmode();
     673              : }
     674              : 
     675              : /*
     676              :  * Create a new read stream object that can be used to perform the equivalent
     677              :  * of a series of ReadBuffer() calls for one fork of one relation.
     678              :  * Internally, it generates larger vectored reads where possible by looking
     679              :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     680              :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     681              :  * write extra data for each block into the space provided to it.  It will
     682              :  * also receive callback_private_data for its own purposes.
     683              :  */
     684              : static ReadStream *
     685       684228 : read_stream_begin_impl(int flags,
     686              :                        BufferAccessStrategy strategy,
     687              :                        Relation rel,
     688              :                        SMgrRelation smgr,
     689              :                        char persistence,
     690              :                        ForkNumber forknum,
     691              :                        ReadStreamBlockNumberCB callback,
     692              :                        void *callback_private_data,
     693              :                        size_t per_buffer_data_size)
     694              : {
     695              :     ReadStream *stream;
     696              :     size_t      size;
     697              :     int16       queue_size;
     698              :     int16       queue_overflow;
     699              :     int         max_ios;
     700              :     int         strategy_pin_limit;
     701              :     uint32      max_pinned_buffers;
     702              :     uint32      max_possible_buffer_limit;
     703              :     Oid         tablespace_id;
     704              : 
     705              :     /*
     706              :      * Decide how many I/Os we will allow to run at the same time.  That
     707              :      * currently means advice to the kernel to tell it that we will soon read.
     708              :      * This number also affects how far we look ahead for opportunities to
     709              :      * start more I/Os.
     710              :      */
     711       684228 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     712       684228 :     if (!OidIsValid(MyDatabaseId) ||
     713       798847 :         (rel && IsCatalogRelation(rel)) ||
     714       197099 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     715              :     {
     716              :         /*
     717              :          * Avoid circularity while trying to look up tablespace settings or
     718              :          * before spccache.c is ready.
     719              :          */
     720       554304 :         max_ios = effective_io_concurrency;
     721              :     }
     722       129924 :     else if (flags & READ_STREAM_MAINTENANCE)
     723        17147 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     724              :     else
     725       112777 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     726              : 
     727              :     /* Cap to INT16_MAX to avoid overflowing below */
     728       684228 :     max_ios = Min(max_ios, PG_INT16_MAX);
     729              : 
     730              :     /*
     731              :      * If starting a multi-block I/O near the end of the queue, we might
     732              :      * temporarily need extra space for overflowing buffers before they are
     733              :      * moved to regular circular position.  This is the maximum extra space we
     734              :      * could need.
     735              :      */
     736       684228 :     queue_overflow = io_combine_limit - 1;
     737              : 
     738              :     /*
     739              :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     740              :      * pin fewer if we can, though.  We add one so that we can make progress
     741              :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
     742              :      * this also allows an extra full I/O's worth of buffers: after an I/O
     743              :      * finishes we don't want to have to wait for its buffers to be consumed
     744              :      * before starting a new one.
     745              :      *
     746              :      * Be careful not to allow int16 to overflow.  That is possible with the
     747              :      * current GUC range limits, so this is an artificial limit of ~32k
     748              :      * buffers and we'd need to adjust the types to exceed that.  We also have
     749              :      * to allow for the spare entry and the overflow space.
     750              :      */
     751       684228 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
     752       684228 :     max_pinned_buffers = Min(max_pinned_buffers,
     753              :                              PG_INT16_MAX - queue_overflow - 1);
     754              : 
     755              :     /* Give the strategy a chance to limit the number of buffers we pin. */
     756       684228 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     757       684228 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     758              : 
     759              :     /*
     760              :      * Also limit our queue to the maximum number of pins we could ever be
     761              :      * allowed to acquire according to the buffer manager.  We may not really
     762              :      * be able to use them all due to other pins held by this backend, but
     763              :      * we'll check that later in read_stream_start_pending_read().
     764              :      */
     765       684228 :     if (SmgrIsTemp(smgr))
     766         9250 :         max_possible_buffer_limit = GetLocalPinLimit();
     767              :     else
     768       674978 :         max_possible_buffer_limit = GetPinLimit();
     769       684228 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
     770              : 
     771              :     /*
     772              :      * The limit might be zero on a system configured with too few buffers for
     773              :      * the number of connections.  We need at least one to make progress.
     774              :      */
     775       684228 :     max_pinned_buffers = Max(1, max_pinned_buffers);
     776              : 
     777              :     /*
     778              :      * We need one extra entry for buffers and per-buffer data, because users
     779              :      * of per-buffer data have access to the object until the next call to
     780              :      * read_stream_next_buffer(), so we need a gap between the head and tail
     781              :      * of the queue so that we don't clobber it.
     782              :      */
     783       684228 :     queue_size = max_pinned_buffers + 1;
     784              : 
     785              :     /*
     786              :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     787              :      * one big chunk.  Though we have queue_size buffers, we want to be able
     788              :      * to assume that all the buffers for a single read are contiguous (i.e.
     789              :      * don't wrap around halfway through), so we allow temporary overflows of
     790              :      * up to the maximum possible overflow size.
     791              :      */
     792       684228 :     size = offsetof(ReadStream, buffers);
     793       684228 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
     794       684228 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     795       684228 :     size += per_buffer_data_size * queue_size;
     796       684228 :     size += MAXIMUM_ALIGNOF * 2;
     797       684228 :     stream = (ReadStream *) palloc(size);
     798       684228 :     memset(stream, 0, offsetof(ReadStream, buffers));
     799       684228 :     stream->ios = (InProgressIO *)
     800       684228 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
     801       684228 :     if (per_buffer_data_size > 0)
     802       130876 :         stream->per_buffer_data = (void *)
     803       130876 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     804              : 
     805       684228 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
     806       684228 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
     807              : 
     808              : #ifdef USE_PREFETCH
     809              : 
     810              :     /*
     811              :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
     812              :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
     813              :      * caller hasn't promised sequential access (overriding our detection
     814              :      * heuristics), and max_ios hasn't been set to zero.
     815              :      */
     816       684228 :     if (stream->sync_mode &&
     817         3161 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
     818         3161 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     819              :         max_ios > 0)
     820          738 :         stream->advice_enabled = true;
     821              : #endif
     822              : 
     823              :     /*
     824              :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
     825              :      * we still need to allocate space to combine and run one I/O.  Bump it up
     826              :      * to one, and remember to ask for synchronous I/O only.
     827              :      */
     828       684228 :     if (max_ios == 0)
     829              :     {
     830            7 :         max_ios = 1;
     831            7 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
     832              :     }
     833              : 
     834              :     /*
     835              :      * Capture stable values for these two GUC-derived numbers for the
     836              :      * lifetime of this stream, so we don't have to worry about the GUCs
     837              :      * changing underneath us beyond this point.
     838              :      */
     839       684228 :     stream->max_ios = max_ios;
     840       684228 :     stream->io_combine_limit = io_combine_limit;
     841              : 
     842       684228 :     stream->per_buffer_data_size = per_buffer_data_size;
     843       684228 :     stream->max_pinned_buffers = max_pinned_buffers;
     844       684228 :     stream->queue_size = queue_size;
     845       684228 :     stream->callback = callback;
     846       684228 :     stream->callback_private_data = callback_private_data;
     847       684228 :     stream->buffered_blocknum = InvalidBlockNumber;
     848       684228 :     stream->seq_blocknum = InvalidBlockNumber;
     849       684228 :     stream->seq_until_processed = InvalidBlockNumber;
     850       684228 :     stream->temporary = SmgrIsTemp(smgr);
     851       684228 :     stream->distance_decay_holdoff = 0;
     852              : 
     853              :     /*
     854              :      * Skip the initial ramp-up phase if the caller says we're going to be
     855              :      * reading the whole relation.  This way we start out assuming we'll be
     856              :      * doing full io_combine_limit sized reads.
     857              :      */
     858       684228 :     if (flags & READ_STREAM_FULL)
     859              :     {
     860        78516 :         stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
     861        78516 :         stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
     862              :     }
     863              :     else
     864              :     {
     865       605712 :         stream->readahead_distance = 1;
     866       605712 :         stream->combine_distance = 1;
     867              :     }
     868       684228 :     stream->resume_readahead_distance = stream->readahead_distance;
     869       684228 :     stream->resume_combine_distance = stream->combine_distance;
     870              : 
     871              :     /*
     872              :      * Since we always access the same relation, we can initialize parts of
     873              :      * the ReadBuffersOperation objects and leave them that way, to avoid
     874              :      * wasting CPU cycles writing to them for each read.
     875              :      */
     876     11660467 :     for (int i = 0; i < max_ios; ++i)
     877              :     {
     878     10976239 :         stream->ios[i].op.rel = rel;
     879     10976239 :         stream->ios[i].op.smgr = smgr;
     880     10976239 :         stream->ios[i].op.persistence = persistence;
     881     10976239 :         stream->ios[i].op.forknum = forknum;
     882     10976239 :         stream->ios[i].op.strategy = strategy;
     883              :     }
     884              : 
     885       684228 :     return stream;
     886              : }
     887              : 
     888              : /*
     889              :  * Create a new read stream for reading a relation.
     890              :  * See read_stream_begin_impl() for the detailed explanation.
     891              :  */
     892              : ReadStream *
     893       612456 : read_stream_begin_relation(int flags,
     894              :                            BufferAccessStrategy strategy,
     895              :                            Relation rel,
     896              :                            ForkNumber forknum,
     897              :                            ReadStreamBlockNumberCB callback,
     898              :                            void *callback_private_data,
     899              :                            size_t per_buffer_data_size)
     900              : {
     901       612456 :     return read_stream_begin_impl(flags,
     902              :                                   strategy,
     903              :                                   rel,
     904              :                                   RelationGetSmgr(rel),
     905       612456 :                                   rel->rd_rel->relpersistence,
     906              :                                   forknum,
     907              :                                   callback,
     908              :                                   callback_private_data,
     909              :                                   per_buffer_data_size);
     910              : }
     911              : 
     912              : /*
     913              :  * Create a new read stream for reading a SMgr relation.
     914              :  * See read_stream_begin_impl() for the detailed explanation.
     915              :  */
     916              : ReadStream *
     917        71772 : read_stream_begin_smgr_relation(int flags,
     918              :                                 BufferAccessStrategy strategy,
     919              :                                 SMgrRelation smgr,
     920              :                                 char smgr_persistence,
     921              :                                 ForkNumber forknum,
     922              :                                 ReadStreamBlockNumberCB callback,
     923              :                                 void *callback_private_data,
     924              :                                 size_t per_buffer_data_size)
     925              : {
     926        71772 :     return read_stream_begin_impl(flags,
     927              :                                   strategy,
     928              :                                   NULL,
     929              :                                   smgr,
     930              :                                   smgr_persistence,
     931              :                                   forknum,
     932              :                                   callback,
     933              :                                   callback_private_data,
     934              :                                   per_buffer_data_size);
     935              : }
     936              : 
     937              : /*
     938              :  * Pull one pinned buffer out of a stream.  Each call returns successive
     939              :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     940              :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     941              :  * per-buffer data that the callback had a chance to populate, which remains
     942              :  * valid until the next call to read_stream_next_buffer().  When the stream
     943              :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     944              :  * the stream early at any time by calling read_stream_end().
     945              :  */
     946              : Buffer
     947      7958465 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     948              : {
     949              :     Buffer      buffer;
     950              :     int16       oldest_buffer_index;
     951              : 
     952              : #ifndef READ_STREAM_DISABLE_FAST_PATH
     953              : 
     954              :     /*
     955              :      * A fast path for all-cached scans.  This is the same as the usual
     956              :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
     957              :      * we can skip the queue management code, stay in the same buffer slot and
     958              :      * use singular StartReadBuffer().
     959              :      */
     960      7958465 :     if (likely(stream->fast_path))
     961              :     {
     962              :         BlockNumber next_blocknum;
     963              : 
     964              :         /* Fast path assumptions. */
     965              :         Assert(stream->ios_in_progress == 0);
     966              :         Assert(stream->forwarded_buffers == 0);
     967              :         Assert(stream->pinned_buffers == 1);
     968              :         Assert(stream->readahead_distance == 1);
     969              :         Assert(stream->combine_distance == 1);
     970              :         Assert(stream->pending_read_nblocks == 0);
     971              :         Assert(stream->per_buffer_data_size == 0);
     972              :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
     973              : 
     974              :         /* We're going to return the buffer we pinned last time. */
     975      2936611 :         oldest_buffer_index = stream->oldest_buffer_index;
     976              :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     977              :                stream->next_buffer_index);
     978      2936611 :         buffer = stream->buffers[oldest_buffer_index];
     979              :         Assert(buffer != InvalidBuffer);
     980              : 
     981              :         /* Choose the next block to pin. */
     982      2936611 :         next_blocknum = read_stream_get_block(stream, NULL);
     983              : 
     984      2936611 :         if (likely(next_blocknum != InvalidBlockNumber))
     985              :         {
     986      2829005 :             int         flags = stream->read_buffers_flags;
     987              : 
     988      2829005 :             if (stream->advice_enabled)
     989          552 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     990              : 
     991              :             /*
     992              :              * While in fast-path, execute any IO that we might encounter
     993              :              * synchronously. Because we are, right now, only looking one
     994              :              * block ahead, dispatching any occasional IO to workers would
     995              :              * have the overhead of dispatching to workers, without any
     996              :              * realistic chance of the IO completing before we need it. We
     997              :              * will switch to non-synchronous IO after this.
     998              :              *
     999              :              * Arguably we should do so only for worker, as there's far less
    1000              :              * dispatch overhead with io_uring. However, tests so far have not
    1001              :              * shown a clear downside and additional io_method awareness here
    1002              :              * seems not great from an abstraction POV.
    1003              :              */
    1004      2829005 :             flags |= READ_BUFFERS_SYNCHRONOUSLY;
    1005              : 
    1006              :             /*
    1007              :              * Pin a buffer for the next call.  Same buffer entry, and
    1008              :              * arbitrary I/O entry (they're all free).  We don't have to
    1009              :              * adjust pinned_buffers because we're transferring one to caller
    1010              :              * but pinning one more.
    1011              :              *
    1012              :              * In the fast path we don't need to check the pin limit.  We're
    1013              :              * always allowed at least one pin so that progress can be made,
    1014              :              * and that's all we need here.  Although two pins are momentarily
    1015              :              * held at the same time, the model used here is that the stream
    1016              :              * holds only one, and the other now belongs to the caller.
    1017              :              */
    1018      2829005 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
    1019              :                                         &stream->buffers[oldest_buffer_index],
    1020              :                                         next_blocknum,
    1021              :                                         flags)))
    1022              :             {
    1023              :                 /* Fast return. */
    1024      2812679 :                 return buffer;
    1025              :             }
    1026              : 
    1027              :             /* Next call must wait for I/O for the newly pinned buffer. */
    1028        16326 :             stream->oldest_io_index = 0;
    1029        16326 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
    1030        16326 :             stream->ios_in_progress = 1;
    1031        16326 :             stream->ios[0].buffer_index = oldest_buffer_index;
    1032        16326 :             stream->seq_blocknum = next_blocknum + 1;
    1033              : 
    1034              :             /*
    1035              :              * XXX: It might be worth triggering additional read-ahead here,
    1036              :              * to avoid having to effectively do another synchronous IO for
    1037              :              * the next block (if it were also a miss).
    1038              :              */
    1039              :         }
    1040              :         else
    1041              :         {
    1042              :             /* No more blocks, end of stream. */
    1043       107606 :             stream->readahead_distance = 0;
    1044       107606 :             stream->combine_distance = 0;
    1045       107606 :             stream->oldest_buffer_index = stream->next_buffer_index;
    1046       107606 :             stream->pinned_buffers = 0;
    1047       107606 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
    1048              :         }
    1049              : 
    1050       123932 :         stream->fast_path = false;
    1051       123932 :         return buffer;
    1052              :     }
    1053              : #endif
    1054              : 
    1055      5021854 :     if (unlikely(stream->pinned_buffers == 0))
    1056              :     {
    1057              :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
    1058              : 
    1059              :         /* End of stream reached?  */
    1060      3538913 :         if (stream->readahead_distance == 0)
    1061      1986840 :             return InvalidBuffer;
    1062              : 
    1063              :         /*
    1064              :          * The usual order of operations is that we look ahead at the bottom
    1065              :          * of this function after potentially finishing an I/O and making
    1066              :          * space for more, but if we're just starting up we'll need to crank
    1067              :          * the handle to get started.
    1068              :          */
    1069      1552073 :         read_stream_look_ahead(stream);
    1070              : 
    1071              :         /* End of stream reached? */
    1072      1552073 :         if (stream->pinned_buffers == 0)
    1073              :         {
    1074              :             Assert(stream->readahead_distance == 0);
    1075       846842 :             return InvalidBuffer;
    1076              :         }
    1077              :     }
    1078              : 
    1079              :     /* Grab the oldest pinned buffer and associated per-buffer data. */
    1080              :     Assert(stream->pinned_buffers > 0);
    1081      2188172 :     oldest_buffer_index = stream->oldest_buffer_index;
    1082              :     Assert(oldest_buffer_index >= 0 &&
    1083              :            oldest_buffer_index < stream->queue_size);
    1084      2188172 :     buffer = stream->buffers[oldest_buffer_index];
    1085      2188172 :     if (per_buffer_data)
    1086       665097 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
    1087              : 
    1088              :     Assert(BufferIsValid(buffer));
    1089              : 
    1090              :     /* Do we have to wait for an associated I/O first? */
    1091      2188172 :     if (stream->ios_in_progress > 0 &&
    1092       766516 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
    1093              :     {
    1094       704630 :         int16       io_index = stream->oldest_io_index;
    1095              :         bool        needed_wait;
    1096              : 
    1097              :         /* Sanity check that we still agree on the buffers. */
    1098              :         Assert(stream->ios[io_index].op.buffers ==
    1099              :                &stream->buffers[oldest_buffer_index]);
    1100              : 
    1101       704630 :         needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
    1102              : 
    1103              :         Assert(stream->ios_in_progress > 0);
    1104       704608 :         stream->ios_in_progress--;
    1105       704608 :         if (++stream->oldest_io_index == stream->max_ios)
    1106        29203 :             stream->oldest_io_index = 0;
    1107              : 
    1108              :         /*
    1109              :          * If the IO was executed synchronously, we will never see
    1110              :          * WaitReadBuffers() block. Treat it as if it did block. This is
    1111              :          * particularly crucial when effective_io_concurrency=0 is used, as
    1112              :          * all IO will be synchronous.  Without treating synchronous IO as
    1113              :          * having waited, we'd never allow the distance to get large enough to
    1114              :          * allow for IO combining, resulting in bad performance.
    1115              :          */
    1116       704608 :         if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY)
    1117        16818 :             needed_wait = true;
    1118              : 
    1119              :         /*
    1120              :          * Have the read-ahead distance ramp up rapidly after we needed to
    1121              :          * wait for IO. We only increase the read-ahead-distance when we
    1122              :          * needed to wait, to avoid increasing the distance further than
    1123              :          * necessary, as looking ahead too far can be costly, both due to the
    1124              :          * cost of unnecessarily pinning many buffers and due to doing IOs
    1125              :          * that may never be consumed if the stream is ended/reset before
    1126              :          * completion.
    1127              :          *
    1128              :          * If we did not need to wait, the current distance was evidently
    1129              :          * sufficient.
    1130              :          *
    1131              :          * NB: Must not increase the distance if we already reached the end of
    1132              :          * the stream, as stream->readahead_distance == 0 is used to keep
    1133              :          * track of having reached the end.
    1134              :          */
    1135       704608 :         if (stream->readahead_distance > 0 && needed_wait)
    1136              :         {
    1137              :             /* wider temporary value, due to overflow risk */
    1138              :             int32       readahead_distance;
    1139              : 
    1140       313390 :             readahead_distance = stream->readahead_distance * 2;
    1141       313390 :             readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
    1142       313390 :             stream->readahead_distance = readahead_distance;
    1143              :         }
    1144              : 
    1145              :         /*
    1146              :          * As we needed IO, prevent distances from being reduced within our
    1147              :          * maximum look-ahead window. This avoids collapsing distances too
    1148              :          * quickly in workloads where most of the required blocks are cached,
    1149              :          * but where the remaining IOs are a sufficient enough factor to cause
    1150              :          * a substantial slowdown if executed synchronously.
    1151              :          *
    1152              :          * There are valid arguments for preventing decay for max_ios or for
    1153              :          * max_pinned_buffers.  But the argument for max_pinned_buffers seems
    1154              :          * clearer - if we can't see any misses within the maximum look-ahead
    1155              :          * distance, we can't do any useful read-ahead.
    1156              :          */
    1157       704608 :         stream->distance_decay_holdoff = stream->max_pinned_buffers;
    1158              : 
    1159              :         /*
    1160              :          * Whether we needed to wait or not, allow for more IO combining if we
    1161              :          * needed to do IO. The reason to do so independent of needing to wait
    1162              :          * is that when the data is resident in the kernel page cache, IO
    1163              :          * combining reduces the syscall / dispatch overhead, making it
    1164              :          * worthwhile regardless of needing to wait.
    1165              :          *
    1166              :          * It is also important with io_uring as it will never signal the need
    1167              :          * to wait for reads if all the data is in the page cache. There are
    1168              :          * heuristics to deal with that in method_io_uring.c, but they only
    1169              :          * work when the IO gets large enough.
    1170              :          */
    1171       704608 :         if (stream->combine_distance > 0 &&
    1172       653104 :             stream->combine_distance < stream->io_combine_limit)
    1173              :         {
    1174              :             /* wider temporary value, due to overflow risk */
    1175              :             int32       combine_distance;
    1176              : 
    1177       644447 :             combine_distance = stream->combine_distance * 2;
    1178       644447 :             combine_distance = Min(combine_distance, stream->io_combine_limit);
    1179       644447 :             combine_distance = Min(combine_distance, stream->max_pinned_buffers);
    1180       644447 :             stream->combine_distance = combine_distance;
    1181              :         }
    1182              : 
    1183              :         /*
    1184              :          * If we've reached the first block of a sequential region we're
    1185              :          * issuing advice for, cancel that until the next jump.  The kernel
    1186              :          * will see the sequential preadv() pattern starting here.
    1187              :          */
    1188       704608 :         if (stream->advice_enabled &&
    1189          318 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
    1190          275 :             stream->seq_until_processed = InvalidBlockNumber;
    1191              :     }
    1192              : 
    1193              :     /*
    1194              :      * We must zap this queue entry, or else it would appear as a forwarded
    1195              :      * buffer.  If it's potentially in the overflow zone (ie from a
    1196              :      * multi-block I/O that wrapped around the queue), also zap the copy.
    1197              :      */
    1198      2188150 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
    1199      2188150 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
    1200      1660149 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
    1201              :             InvalidBuffer;
    1202              : 
    1203              : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
    1204              : 
    1205              :     /*
    1206              :      * The caller will get access to the per-buffer data, until the next call.
    1207              :      * We wipe the one before, which is never occupied because queue_size
    1208              :      * allowed one extra element.  This will hopefully trip up client code
    1209              :      * that is holding a dangling pointer to it.
    1210              :      */
    1211              :     if (stream->per_buffer_data)
    1212              :     {
    1213              :         void       *per_buffer_data;
    1214              : 
    1215              :         per_buffer_data = get_per_buffer_data(stream,
    1216              :                                               oldest_buffer_index == 0 ?
    1217              :                                               stream->queue_size - 1 :
    1218              :                                               oldest_buffer_index - 1);
    1219              : 
    1220              : #if defined(CLOBBER_FREED_MEMORY)
    1221              :         /* This also tells Valgrind the memory is "noaccess". */
    1222              :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
    1223              : #elif defined(USE_VALGRIND)
    1224              :         /* Tell it ourselves. */
    1225              :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
    1226              :                                    stream->per_buffer_data_size);
    1227              : #endif
    1228              :     }
    1229              : #endif
    1230              : 
    1231              :     /* Pin transferred to caller. */
    1232              :     Assert(stream->pinned_buffers > 0);
    1233      2188150 :     stream->pinned_buffers--;
    1234              : 
    1235              :     /* Advance oldest buffer, with wrap-around. */
    1236      2188150 :     stream->oldest_buffer_index++;
    1237      2188150 :     if (stream->oldest_buffer_index == stream->queue_size)
    1238       339933 :         stream->oldest_buffer_index = 0;
    1239              : 
    1240              :     /* Prepare for the next call. */
    1241      2188150 :     read_stream_look_ahead(stream);
    1242              : 
    1243              : #ifndef READ_STREAM_DISABLE_FAST_PATH
    1244              :     /* See if we can take the fast path for all-cached scans next time. */
    1245      2188142 :     if (stream->ios_in_progress == 0 &&
    1246      1521209 :         stream->forwarded_buffers == 0 &&
    1247      1517970 :         stream->pinned_buffers == 1 &&
    1248       826187 :         stream->readahead_distance == 1 &&
    1249       741195 :         stream->combine_distance == 1 &&
    1250       738357 :         stream->pending_read_nblocks == 0 &&
    1251       737213 :         stream->per_buffer_data_size == 0)
    1252              :     {
    1253              :         /*
    1254              :          * The fast path spins on one buffer entry repeatedly instead of
    1255              :          * rotating through the whole queue and clearing the entries behind
    1256              :          * it.  If the buffer it starts with happened to be forwarded between
    1257              :          * StartReadBuffers() calls and also wrapped around the circular queue
    1258              :          * partway through, then a copy also exists in the overflow zone, and
    1259              :          * it won't clear it out as the regular path would.  Do that now, so
    1260              :          * it doesn't need code for that.
    1261              :          */
    1262       236346 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
    1263       234658 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
    1264              :                 InvalidBuffer;
    1265              : 
    1266       236346 :         stream->fast_path = true;
    1267              :     }
    1268              : #endif
    1269              : 
    1270      2188142 :     return buffer;
    1271              : }
    1272              : 
    1273              : /*
    1274              :  * Transitional support for code that would like to perform or skip reads
    1275              :  * itself, without using the stream.  Returns, and consumes, the next block
    1276              :  * number that would be read by the stream's look-ahead algorithm, or
    1277              :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
    1278              :  * strategy that would be used to read it.
    1279              :  */
    1280              : BlockNumber
    1281            0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
    1282              : {
    1283            0 :     *strategy = stream->ios[0].op.strategy;
    1284            0 :     return read_stream_get_block(stream, NULL);
    1285              : }
    1286              : 
    1287              : /*
    1288              :  * Temporarily stop consuming block numbers from the block number callback.
    1289              :  * If called inside the block number callback, its return value should be
    1290              :  * returned by the callback.
    1291              :  */
    1292              : BlockNumber
    1293            0 : read_stream_pause(ReadStream *stream)
    1294              : {
    1295            0 :     stream->resume_readahead_distance = stream->readahead_distance;
    1296            0 :     stream->resume_combine_distance = stream->combine_distance;
    1297            0 :     stream->readahead_distance = 0;
    1298            0 :     stream->combine_distance = 0;
    1299            0 :     return InvalidBlockNumber;
    1300              : }
    1301              : 
    1302              : /*
    1303              :  * Resume looking ahead after the block number callback reported
    1304              :  * end-of-stream. This is useful for streams of self-referential blocks, after
    1305              :  * a buffer needed to be consumed and examined to find more block numbers.
    1306              :  */
    1307              : void
    1308            0 : read_stream_resume(ReadStream *stream)
    1309              : {
    1310            0 :     stream->readahead_distance = stream->resume_readahead_distance;
    1311            0 :     stream->combine_distance = stream->resume_combine_distance;
    1312            0 : }
    1313              : 
    1314              : /*
    1315              :  * Reset a read stream by releasing any queued up buffers, allowing the stream
    1316              :  * to be used again for different blocks.  This can be used to clear an
    1317              :  * end-of-stream condition and start again, or to throw away blocks that were
    1318              :  * speculatively read and read some different blocks instead.
    1319              :  */
    1320              : void
    1321      1553562 : read_stream_reset(ReadStream *stream)
    1322              : {
    1323              :     int16       index;
    1324              :     Buffer      buffer;
    1325              : 
    1326              :     /* Stop looking ahead. */
    1327      1553562 :     stream->readahead_distance = 0;
    1328      1553562 :     stream->combine_distance = 0;
    1329              : 
    1330              :     /* Forget buffered block number and fast path state. */
    1331      1553562 :     stream->buffered_blocknum = InvalidBlockNumber;
    1332      1553562 :     stream->fast_path = false;
    1333              : 
    1334              :     /* Unpin anything that wasn't consumed. */
    1335      1693112 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    1336       139550 :         ReleaseBuffer(buffer);
    1337              : 
    1338              :     /* Unpin any unused forwarded buffers. */
    1339      1553562 :     index = stream->next_buffer_index;
    1340      1553562 :     while (index < stream->initialized_buffers &&
    1341       217911 :            (buffer = stream->buffers[index]) != InvalidBuffer)
    1342              :     {
    1343              :         Assert(stream->forwarded_buffers > 0);
    1344            0 :         stream->forwarded_buffers--;
    1345            0 :         ReleaseBuffer(buffer);
    1346              : 
    1347            0 :         stream->buffers[index] = InvalidBuffer;
    1348            0 :         if (index < stream->io_combine_limit - 1)
    1349            0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
    1350              : 
    1351            0 :         if (++index == stream->queue_size)
    1352            0 :             index = 0;
    1353              :     }
    1354              : 
    1355              :     Assert(stream->forwarded_buffers == 0);
    1356              :     Assert(stream->pinned_buffers == 0);
    1357              :     Assert(stream->ios_in_progress == 0);
    1358              : 
    1359              :     /* Start off assuming data is cached. */
    1360      1553562 :     stream->readahead_distance = 1;
    1361      1553562 :     stream->combine_distance = 1;
    1362      1553562 :     stream->resume_readahead_distance = stream->readahead_distance;
    1363      1553562 :     stream->resume_combine_distance = stream->combine_distance;
    1364      1553562 :     stream->distance_decay_holdoff = 0;
    1365      1553562 : }
    1366              : 
    1367              : /*
    1368              :  * Release and free a read stream.
    1369              :  */
    1370              : void
    1371       681019 : read_stream_end(ReadStream *stream)
    1372              : {
    1373       681019 :     read_stream_reset(stream);
    1374       681019 :     pfree(stream);
    1375       681019 : }
        

Generated by: LCOV version 2.0-1