LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 91.3 % 358 327
Test Date: 2026-05-04 19:16:35 Functions: 81.0 % 21 17
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * read_stream.c
       4              :  *    Mechanism for accessing buffered relation data with look-ahead
       5              :  *
       6              :  * Code that needs to access relation data typically pins blocks one at a
       7              :  * time, often in a predictable order that might be sequential or data-driven.
       8              :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9              :  * because blocks that are not yet in the buffer pool require I/O operations
      10              :  * that are small and might stall waiting for storage.  This mechanism looks
      11              :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12              :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13              :  * distance.
      14              :  *
      15              :  * A user-provided callback generates a stream of block numbers that is used
      16              :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17              :  * pending read.  When that isn't possible, the existing pending read is sent
      18              :  * to StartReadBuffers() so that a new one can begin to form.
      19              :  *
      20              :  * The algorithm for controlling the look-ahead distance is based on recent
      21              :  * cache / miss history, as well as whether we need to wait for I/O completion
      22              :  * after a miss.  When no I/O is necessary, there is no benefit in looking
      23              :  * ahead more than one block.  This is the default initial assumption.  When
      24              :  * blocks needing I/O are streamed, the combine distance is increased to
      25              :  * benefit from I/O combining and the read-ahead distance is increased
      26              :  * whenever we need to wait for I/O to try to benefit from increased I/O
      27              :  * concurrency. Both are reduced gradually when cached blocks are streamed.
      28              :  *
      29              :  * The main data structure is a circular queue of buffers of size
      30              :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      31              :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      32              :  * variable sized object that is passed from the callback to the consumer of
      33              :  * buffers.
      34              :  *
      35              :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      36              :  * I/Os that have been started with StartReadBuffers(), and for which
      37              :  * WaitReadBuffers() must be called before returning the buffer.
      38              :  *
      39              :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      40              :  * successive calls, then these data structures might appear as follows:
      41              :  *
      42              :  *                          buffers buf/data       ios
      43              :  *
      44              :  *                          +----+  +-----+       +--------+
      45              :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      46              :  *                          +----+  +-----+  |    +--------+
      47              :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      48              :  *                          +----+  +-----+  | |  +--------+
      49              :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      50              :  *                          +----+  +-----+    |  +--------+
      51              :  *                          | 43 |  |  ?  |    |  |        |
      52              :  *                          +----+  +-----+    |  +--------+
      53              :  *                          | 44 |  |  ?  |    |  |        |
      54              :  *                          +----+  +-----+    |  +--------+
      55              :  *                          | 60 |  |  ?  |<---+
      56              :  *                          +----+  +-----+
      57              :  *     next_buffer_index -> |    |  |     |
      58              :  *                          +----+  +-----+
      59              :  *
      60              :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      61              :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      62              :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      63              :  * does block 60.
      64              :  *
      65              :  *
      66              :  * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
      67              :  * Portions Copyright (c) 1994, Regents of the University of California
      68              :  *
      69              :  * IDENTIFICATION
      70              :  *    src/backend/storage/aio/read_stream.c
      71              :  *
      72              :  *-------------------------------------------------------------------------
      73              :  */
      74              : #include "postgres.h"
      75              : 
      76              : #include "miscadmin.h"
      77              : #include "executor/instrument_node.h"
      78              : #include "storage/aio.h"
      79              : #include "storage/fd.h"
      80              : #include "storage/smgr.h"
      81              : #include "storage/read_stream.h"
      82              : #include "utils/memdebug.h"
      83              : #include "utils/rel.h"
      84              : #include "utils/spccache.h"
      85              : 
      86              : typedef struct InProgressIO
      87              : {
      88              :     int16       buffer_index;
      89              :     ReadBuffersOperation op;
      90              : } InProgressIO;
      91              : 
      92              : /*
      93              :  * State for managing a stream of reads.
      94              :  */
      95              : struct ReadStream
      96              : {
      97              :     int16       max_ios;
      98              :     int16       io_combine_limit;
      99              :     int16       ios_in_progress;
     100              :     int16       queue_size;
     101              :     int16       max_pinned_buffers;
     102              :     int16       forwarded_buffers;
     103              :     int16       pinned_buffers;
     104              : 
     105              :     /*
     106              :      * Limit of how far, in blocks, to look-ahead for IO combining and for
     107              :      * read-ahead.
     108              :      *
     109              :      * The limits for read-ahead and combining are handled separately to allow
     110              :      * for IO combining even in cases where the I/O subsystem can keep up at a
     111              :      * low read-ahead distance, as doing larger IOs is more efficient.
     112              :      *
     113              :      * Set to 0 when the end of the stream is reached.
     114              :      */
     115              :     int16       combine_distance;
     116              :     int16       readahead_distance;
     117              :     uint16      distance_decay_holdoff;
     118              :     int16       initialized_buffers;
     119              :     int16       resume_readahead_distance;
     120              :     int16       resume_combine_distance;
     121              :     int         read_buffers_flags;
     122              :     bool        sync_mode;      /* using io_method=sync */
     123              :     bool        batch_mode;     /* READ_STREAM_USE_BATCHING */
     124              :     bool        advice_enabled;
     125              :     bool        temporary;
     126              : 
     127              :     /* scan stats counters */
     128              :     IOStats    *stats;
     129              : 
     130              :     /*
     131              :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     132              :      * control problems when I/Os are split.
     133              :      */
     134              :     BlockNumber buffered_blocknum;
     135              : 
     136              :     /*
     137              :      * The callback that will tell us which block numbers to read, and an
     138              :      * opaque pointer that will be pass to it for its own purposes.
     139              :      */
     140              :     ReadStreamBlockNumberCB callback;
     141              :     void       *callback_private_data;
     142              : 
     143              :     /* Next expected block, for detecting sequential access. */
     144              :     BlockNumber seq_blocknum;
     145              :     BlockNumber seq_until_processed;
     146              : 
     147              :     /* The read operation we are currently preparing. */
     148              :     BlockNumber pending_read_blocknum;
     149              :     int16       pending_read_nblocks;
     150              : 
     151              :     /* Space for buffers and optional per-buffer private data. */
     152              :     size_t      per_buffer_data_size;
     153              :     void       *per_buffer_data;
     154              : 
     155              :     /* Read operations that have been started but not waited for yet. */
     156              :     InProgressIO *ios;
     157              :     int16       oldest_io_index;
     158              :     int16       next_io_index;
     159              : 
     160              :     bool        fast_path;
     161              : 
     162              :     /* Circular queue of buffers. */
     163              :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     164              :     int16       next_buffer_index;  /* Index of next buffer to pin */
     165              :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     166              : };
     167              : 
     168              : /*
     169              :  * Return a pointer to the per-buffer data by index.
     170              :  */
     171              : static inline void *
     172      4408011 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     173              : {
     174      8816022 :     return (char *) stream->per_buffer_data +
     175      4408011 :         stream->per_buffer_data_size * buffer_index;
     176              : }
     177              : 
     178              : /*
     179              :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     180              :  * blocks [current_blocknum, last_exclusive).
     181              :  */
     182              : BlockNumber
     183       415940 : block_range_read_stream_cb(ReadStream *stream,
     184              :                            void *callback_private_data,
     185              :                            void *per_buffer_data)
     186              : {
     187       415940 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     188              : 
     189       415940 :     if (p->current_blocknum < p->last_exclusive)
     190       339888 :         return p->current_blocknum++;
     191              : 
     192        76052 :     return InvalidBlockNumber;
     193              : }
     194              : 
     195              : /*
     196              :  * Update stream stats with current pinned buffer depth.
     197              :  *
     198              :  * Called once per buffer returned to the consumer in read_stream_next_buffer().
     199              :  * Records the number of pinned buffers at that moment, so we can compute the
     200              :  * average look-ahead depth.
     201              :  */
     202              : static inline void
     203      5067981 : read_stream_count_prefetch(ReadStream *stream)
     204              : {
     205      5067981 :     IOStats    *stats = stream->stats;
     206              : 
     207      5067981 :     if (stats == NULL)
     208      5067973 :         return;
     209              : 
     210            8 :     stats->prefetch_count++;
     211            8 :     stats->distance_sum += stream->pinned_buffers;
     212            8 :     if (stream->pinned_buffers > stats->distance_max)
     213            8 :         stats->distance_max = stream->pinned_buffers;
     214              : }
     215              : 
     216              : /*
     217              :  * Update stream stats about size of I/O requests.
     218              :  *
     219              :  * We count the number of I/O requests, size of requests (counted in blocks)
     220              :  * and number of in-progress I/Os.
     221              :  */
     222              : static inline void
     223       703470 : read_stream_count_io(ReadStream *stream, int nblocks, int in_progress)
     224              : {
     225       703470 :     IOStats    *stats = stream->stats;
     226              : 
     227       703470 :     if (stats == NULL)
     228       703469 :         return;
     229              : 
     230            1 :     stats->io_count++;
     231            1 :     stats->io_nblocks += nblocks;
     232            1 :     stats->io_in_progress += in_progress;
     233              : }
     234              : 
     235              : /*
     236              :  * Update stream stats about waits for I/O when consuming buffers.
     237              :  *
     238              :  * We count the number of I/O waits while pulling buffers out of a stream.
     239              :  */
     240              : static inline void
     241       347149 : read_stream_count_wait(ReadStream *stream)
     242              : {
     243       347149 :     IOStats    *stats = stream->stats;
     244              : 
     245       347149 :     if (stats == NULL)
     246       347148 :         return;
     247              : 
     248            1 :     stats->wait_count++;
     249              : }
     250              : 
     251              : /*
     252              :  * Enable collection of stats into the provided IOStats.
     253              :  */
     254              : void
     255            8 : read_stream_enable_stats(ReadStream *stream, IOStats *stats)
     256              : {
     257            8 :     stream->stats = stats;
     258            8 :     if (stream->stats)
     259            8 :         stream->stats->distance_capacity = stream->max_pinned_buffers;
     260            8 : }
     261              : 
     262              : /*
     263              :  * Ask the callback which block it would like us to read next, with a one block
     264              :  * buffer in front to allow read_stream_unget_block() to work.
     265              :  */
     266              : static inline BlockNumber
     267      6614517 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     268              : {
     269              :     BlockNumber blocknum;
     270              : 
     271      6614517 :     blocknum = stream->buffered_blocknum;
     272      6614517 :     if (blocknum != InvalidBlockNumber)
     273            0 :         stream->buffered_blocknum = InvalidBlockNumber;
     274              :     else
     275              :     {
     276              :         /*
     277              :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
     278              :          * the "noaccess" state that was set when the consumer moved past this
     279              :          * entry last time around the queue, and should also catch callbacks
     280              :          * that fail to initialize data that the buffer consumer later
     281              :          * accesses.  On the first go around, it is undefined already.
     282              :          */
     283              :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     284              :                                     stream->per_buffer_data_size);
     285      6614517 :         blocknum = stream->callback(stream,
     286              :                                     stream->callback_private_data,
     287              :                                     per_buffer_data);
     288              :     }
     289              : 
     290      6614517 :     return blocknum;
     291              : }
     292              : 
     293              : /*
     294              :  * In order to deal with buffer shortages and I/O limits after short reads, we
     295              :  * sometimes need to defer handling of a block we've already consumed from the
     296              :  * registered callback until later.
     297              :  */
     298              : static inline void
     299            0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     300              : {
     301              :     /* We shouldn't ever unget more than one block. */
     302              :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     303              :     Assert(blocknum != InvalidBlockNumber);
     304            0 :     stream->buffered_blocknum = blocknum;
     305            0 : }
     306              : 
     307              : /*
     308              :  * Start as much of the current pending read as we can.  If we have to split it
     309              :  * because of the per-backend buffer limit, or the buffer manager decides to
     310              :  * split it, then the pending read is adjusted to hold the remaining portion.
     311              :  *
     312              :  * We can always start a read of at least size one if we have no progress yet.
     313              :  * Otherwise it's possible that we can't start a read at all because of a lack
     314              :  * of buffers, and then false is returned.  Buffer shortages also reduce the
     315              :  * distance to a level that prevents look-ahead until buffers are released.
     316              :  */
     317              : static bool
     318      2169374 : read_stream_start_pending_read(ReadStream *stream)
     319              : {
     320              :     bool        need_wait;
     321              :     int         requested_nblocks;
     322              :     int         nblocks;
     323              :     int         flags;
     324              :     int         forwarded;
     325              :     int16       io_index;
     326              :     int16       overflow;
     327              :     int16       buffer_index;
     328              :     int         buffer_limit;
     329              : 
     330              :     /* This should only be called with a pending read. */
     331              :     Assert(stream->pending_read_nblocks > 0);
     332              :     Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
     333              : 
     334              :     /* We had better not exceed the per-stream buffer limit with this read. */
     335              :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     336              :            stream->max_pinned_buffers);
     337              : 
     338              : #ifdef USE_ASSERT_CHECKING
     339              :     /* We had better not be overwriting an existing pinned buffer. */
     340              :     if (stream->pinned_buffers > 0)
     341              :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     342              :     else
     343              :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     344              : 
     345              :     /*
     346              :      * Pinned buffers forwarded by a preceding StartReadBuffers() call that
     347              :      * had to split the operation should match the leading blocks of this
     348              :      * following StartReadBuffers() call.
     349              :      */
     350              :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     351              :     for (int i = 0; i < stream->forwarded_buffers; ++i)
     352              :         Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
     353              :                stream->pending_read_blocknum + i);
     354              : 
     355              :     /*
     356              :      * Check that we've cleared the queue/overflow entries corresponding to
     357              :      * the rest of the blocks covered by this read, unless it's the first go
     358              :      * around and we haven't even initialized them yet.
     359              :      */
     360              :     for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
     361              :         Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
     362              :                stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
     363              : #endif
     364              : 
     365              :     /* Do we need to issue read-ahead advice? */
     366      2169374 :     flags = stream->read_buffers_flags;
     367      2169374 :     if (stream->advice_enabled)
     368              :     {
     369         1708 :         if (stream->pending_read_blocknum == stream->seq_blocknum)
     370              :         {
     371              :             /*
     372              :              * Sequential:  Issue advice until the preadv() calls have caught
     373              :              * up with the first advice issued for this sequential region, and
     374              :              * then stay out of the way of the kernel's own read-ahead.
     375              :              */
     376           29 :             if (stream->seq_until_processed != InvalidBlockNumber)
     377            1 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     378              :         }
     379              :         else
     380              :         {
     381              :             /*
     382              :              * Random jump:  Note the starting location of a new potential
     383              :              * sequential region and start issuing advice.  Skip it this time
     384              :              * if the preadv() follows immediately, eg first block in stream.
     385              :              */
     386         1679 :             stream->seq_until_processed = stream->pending_read_blocknum;
     387         1679 :             if (stream->pinned_buffers > 0)
     388           44 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
     389              :         }
     390              :     }
     391              : 
     392              :     /*
     393              :      * How many more buffers is this backend allowed?
     394              :      *
     395              :      * Forwarded buffers are already pinned and map to the leading blocks of
     396              :      * the pending read (the remaining portion of an earlier short read that
     397              :      * we're about to continue).  They are not counted in pinned_buffers, but
     398              :      * they are counted as pins already held by this backend according to the
     399              :      * buffer manager, so they must be added to the limit it grants us.
     400              :      */
     401      2169374 :     if (stream->temporary)
     402        16379 :         buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
     403              :     else
     404      2152995 :         buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
     405              :     Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
     406              : 
     407      2169374 :     buffer_limit += stream->forwarded_buffers;
     408      2169374 :     buffer_limit = Min(buffer_limit, PG_INT16_MAX);
     409              : 
     410      2169374 :     if (buffer_limit == 0 && stream->pinned_buffers == 0)
     411       744148 :         buffer_limit = 1;       /* guarantee progress */
     412              : 
     413              :     /* Does the per-backend limit affect this read? */
     414      2169374 :     nblocks = stream->pending_read_nblocks;
     415      2169374 :     if (buffer_limit < nblocks)
     416              :     {
     417              :         int16       new_distance;
     418              : 
     419              :         /* Shrink distance: no more look-ahead until buffers are released. */
     420         1980 :         new_distance = stream->pinned_buffers + buffer_limit;
     421         1980 :         if (stream->readahead_distance > new_distance)
     422          446 :             stream->readahead_distance = new_distance;
     423              : 
     424              :         /* Unless we have nothing to give the consumer, stop here. */
     425         1980 :         if (stream->pinned_buffers > 0)
     426          114 :             return false;
     427              : 
     428              :         /* A short read is required to make progress. */
     429         1866 :         nblocks = buffer_limit;
     430              :     }
     431              : 
     432              :     /*
     433              :      * We say how many blocks we want to read, but it may be smaller on return
     434              :      * if the buffer manager decides to shorten the read.  Initialize buffers
     435              :      * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
     436              :      * and keep the original nblocks number so we can check for forwarded
     437              :      * buffers as output, below.
     438              :      */
     439      2169260 :     buffer_index = stream->next_buffer_index;
     440      2169260 :     io_index = stream->next_io_index;
     441      3527186 :     while (stream->initialized_buffers < buffer_index + nblocks)
     442      1357926 :         stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
     443      2169260 :     requested_nblocks = nblocks;
     444      2169260 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     445      2169260 :                                  &stream->buffers[buffer_index],
     446              :                                  stream->pending_read_blocknum,
     447              :                                  &nblocks,
     448              :                                  flags);
     449      2169252 :     stream->pinned_buffers += nblocks;
     450              : 
     451              :     /* Remember whether we need to wait before returning this buffer. */
     452      2169252 :     if (!need_wait)
     453              :     {
     454              :         /*
     455              :          * If there currently is no IO in progress, and we have not needed to
     456              :          * issue IO recently, decay the look-ahead distance.  We detect if we
     457              :          * had to issue IO recently by having a decay holdoff that's set to
     458              :          * the max look-ahead distance whenever we need to do IO.  This is
     459              :          * important to ensure we eventually reach a high enough distance to
     460              :          * perform IO asynchronously when starting out with a small look-ahead
     461              :          * distance.
     462              :          */
     463      1481946 :         if (stream->ios_in_progress == 0)
     464              :         {
     465      1481183 :             if (stream->distance_decay_holdoff > 0)
     466        26774 :                 stream->distance_decay_holdoff--;
     467              :             else
     468              :             {
     469      1454409 :                 if (stream->readahead_distance > 1)
     470        17275 :                     stream->readahead_distance--;
     471              : 
     472              :                 /*
     473              :                  * For now we reduce the IO combine distance after
     474              :                  * sufficiently many buffer hits. There is no clear
     475              :                  * performance argument for doing so, but at the moment we
     476              :                  * need to do so to make the entrance into fast_path work
     477              :                  * correctly: We require combine_distance == 1 to enter
     478              :                  * fast-path, as without that condition we would wrongly
     479              :                  * re-enter fast-path when readahead_distance == 1 and
     480              :                  * pinned_buffers == 1, as we would not yet have prepared
     481              :                  * another IO in that situation.
     482              :                  */
     483      1454409 :                 if (stream->combine_distance > 1)
     484        17385 :                     stream->combine_distance--;
     485              :             }
     486              :         }
     487              :     }
     488              :     else
     489              :     {
     490              :         /*
     491              :          * Remember to call WaitReadBuffers() before returning head buffer.
     492              :          * Look-ahead distance will be adjusted after waiting.
     493              :          */
     494       687306 :         stream->ios[io_index].buffer_index = buffer_index;
     495       687306 :         if (++stream->next_io_index == stream->max_ios)
     496        29284 :             stream->next_io_index = 0;
     497              :         Assert(stream->ios_in_progress < stream->max_ios);
     498       687306 :         stream->ios_in_progress++;
     499       687306 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     500              : 
     501              :         /* update I/O stats */
     502       687306 :         read_stream_count_io(stream, nblocks, stream->ios_in_progress);
     503              :     }
     504              : 
     505              :     /*
     506              :      * How many pins were acquired but forwarded to the next call?  These need
     507              :      * to be passed to the next StartReadBuffers() call by leaving them
     508              :      * exactly where they are in the queue, or released if the stream ends
     509              :      * early.  We need the number for accounting purposes, since they are not
     510              :      * counted in stream->pinned_buffers but we already hold them.
     511              :      */
     512      2169252 :     forwarded = 0;
     513      2171474 :     while (nblocks + forwarded < requested_nblocks &&
     514        66782 :            stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
     515         2222 :         forwarded++;
     516      2169252 :     stream->forwarded_buffers = forwarded;
     517              : 
     518              :     /*
     519              :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     520              :      * we want it to wrap around at queue_size.  Copy overflowing buffers to
     521              :      * the front of the array where they'll be consumed, but also leave a copy
     522              :      * in the overflow zone which the I/O operation has a pointer to (it needs
     523              :      * a contiguous array).  Both copies will be cleared when the buffers are
     524              :      * handed to the consumer.
     525              :      */
     526      2169252 :     overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
     527      2169252 :     if (overflow > 0)
     528              :     {
     529              :         Assert(overflow < stream->queue_size);    /* can't overlap */
     530          452 :         memcpy(&stream->buffers[0],
     531          452 :                &stream->buffers[stream->queue_size],
     532              :                sizeof(stream->buffers[0]) * overflow);
     533              :     }
     534              : 
     535              :     /* Compute location of start of next read, without using % operator. */
     536      2169252 :     buffer_index += nblocks;
     537      2169252 :     if (buffer_index >= stream->queue_size)
     538       348505 :         buffer_index -= stream->queue_size;
     539              :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     540      2169252 :     stream->next_buffer_index = buffer_index;
     541              : 
     542              :     /* Adjust the pending read to cover the remaining portion, if any. */
     543      2169252 :     stream->pending_read_blocknum += nblocks;
     544      2169252 :     stream->pending_read_nblocks -= nblocks;
     545              : 
     546      2169252 :     return true;
     547              : }
     548              : 
     549              : /*
     550              :  * Should we continue to perform look ahead?  Looking ahead may allow us to
     551              :  * make the pending IO larger via IO combining or to issue more read-ahead.
     552              :  */
     553              : static inline bool
     554      6392126 : read_stream_should_look_ahead(ReadStream *stream)
     555              : {
     556              :     /* If the callback has signaled end-of-stream, we're done */
     557      6392126 :     if (stream->readahead_distance == 0)
     558       338197 :         return false;
     559              : 
     560              :     /* never start more IOs than our cap */
     561      6053929 :     if (stream->ios_in_progress >= stream->max_ios)
     562            0 :         return false;
     563              : 
     564              :     /*
     565              :      * Allow looking further ahead if we are in the process of building a
     566              :      * larger IO, the IO is not yet big enough, and we don't yet have IO in
     567              :      * flight.
     568              :      *
     569              :      * We do so to allow building larger reads when readahead_distance is
     570              :      * small (e.g. because the I/O subsystem is keeping up or
     571              :      * effective_io_concurrency is small). That's a useful goal because larger
     572              :      * reads are more CPU efficient than smaller reads, even if the system is
     573              :      * not IO bound.
     574              :      *
     575              :      * The reason we do *not* do so when we already have a read prepared (i.e.
     576              :      * why we check for pinned_buffers == 0) is once we are actually reading
     577              :      * ahead, we don't need it:
     578              :      *
     579              :      * - We won't issue unnecessarily small reads as
     580              :      * read_stream_should_issue_now() will return false until the IO is
     581              :      * suitably sized. The issuance of the pending read will be delayed until
     582              :      * enough buffers have been consumed.
     583              :      *
     584              :      * - If we are not reading ahead aggressively enough, future
     585              :      * WaitReadBuffers() calls will return true, leading to readahead_distance
     586              :      * being increased. After that more full-sized IOs can be issued.
     587              :      *
     588              :      * Furthermore, if we did not have the pinned_buffers == 0 condition, we
     589              :      * might end up issuing I/O more aggressively than we need.
     590              :      *
     591              :      * Note that a return of true here can lead to exceeding the read-ahead
     592              :      * limit, but we won't exceed the buffer pin limit (because pinned_buffers
     593              :      * == 0 and combine_distance is capped by max_pinned_buffers).
     594              :      */
     595      6053929 :     if (stream->pending_read_nblocks > 0 &&
     596      2668495 :         stream->pinned_buffers == 0 &&
     597      2525167 :         stream->pending_read_nblocks < stream->combine_distance)
     598       464658 :         return true;
     599              : 
     600              :     /*
     601              :      * Don't start more read-ahead if that'd put us over the distance limit
     602              :      * for doing read-ahead. As stream->readahead_distance is capped by
     603              :      * max_pinned_buffers, this prevents us from looking ahead so far that it
     604              :      * would put us over the pin limit.
     605              :      */
     606      5589271 :     if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
     607      2135871 :         return false;
     608              : 
     609      3453400 :     return true;
     610              : }
     611              : 
     612              : /*
     613              :  * We don't start the pending read just because we've hit the distance limit,
     614              :  * preferring to give it another chance to grow to full io_combine_limit size
     615              :  * once more buffers have been consumed.  But this is not desirable in all
     616              :  * situations - see below.
     617              :  */
     618              : static inline bool
     619      7488737 : read_stream_should_issue_now(ReadStream *stream)
     620              : {
     621      7488737 :     int16       pending_read_nblocks = stream->pending_read_nblocks;
     622              : 
     623              :     /* there is no pending IO that could be issued */
     624      7488737 :     if (pending_read_nblocks == 0)
     625      4949060 :         return false;
     626              : 
     627              :     /* never start more IOs than our cap */
     628      2539677 :     if (stream->ios_in_progress >= stream->max_ios)
     629            0 :         return false;
     630              : 
     631              :     /*
     632              :      * If the callback has signaled end-of-stream, start the pending read
     633              :      * immediately. There is no further potential for IO combining.
     634              :      */
     635      2539677 :     if (stream->readahead_distance == 0)
     636       103511 :         return true;
     637              : 
     638              :     /*
     639              :      * If we've already reached combine_distance, there's no chance of growing
     640              :      * the read further.
     641              :      */
     642      2436166 :     if (pending_read_nblocks >= stream->combine_distance)
     643      2065800 :         return true;
     644              : 
     645              :     /*
     646              :      * If we currently have no reads in flight or prepared, issue the IO once
     647              :      * we are not looking ahead further. This ensures there's always at least
     648              :      * one IO prepared.
     649              :      */
     650       370366 :     if (stream->pinned_buffers == 0 &&
     651       232329 :         !read_stream_should_look_ahead(stream))
     652            0 :         return true;
     653              : 
     654       370366 :     return false;
     655              : }
     656              : 
     657              : static void
     658      3803008 : read_stream_look_ahead(ReadStream *stream)
     659              : {
     660              :     /*
     661              :      * Allow amortizing the cost of submitting IO over multiple IOs. This
     662              :      * requires that we don't do any operations that could lead to a deadlock
     663              :      * with staged-but-unsubmitted IO. The callback needs to opt-in to being
     664              :      * careful.
     665              :      */
     666      3803008 :     if (stream->batch_mode)
     667      3210767 :         pgaio_enter_batchmode();
     668              : 
     669      6159797 :     while (read_stream_should_look_ahead(stream))
     670              :     {
     671              :         BlockNumber blocknum;
     672              :         int16       buffer_index;
     673              :         void       *per_buffer_data;
     674              : 
     675      3685729 :         if (read_stream_should_issue_now(stream))
     676              :         {
     677         2111 :             read_stream_start_pending_read(stream);
     678         2111 :             continue;
     679              :         }
     680              : 
     681              :         /*
     682              :          * See which block the callback wants next in the stream.  We need to
     683              :          * compute the index of the Nth block of the pending read including
     684              :          * wrap-around, but we don't want to use the expensive % operator.
     685              :          */
     686      3683618 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     687      3683618 :         if (buffer_index >= stream->queue_size)
     688         3688 :             buffer_index -= stream->queue_size;
     689              :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     690      3683618 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     691      3683618 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     692      3683618 :         if (blocknum == InvalidBlockNumber)
     693              :         {
     694              :             /* End of stream. */
     695      1328940 :             stream->readahead_distance = 0;
     696      1328940 :             stream->combine_distance = 0;
     697      1328940 :             break;
     698              :         }
     699              : 
     700              :         /* Can we merge it with the pending read? */
     701      2354678 :         if (stream->pending_read_nblocks > 0 &&
     702       252318 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     703              :         {
     704       252261 :             stream->pending_read_nblocks++;
     705       252261 :             continue;
     706              :         }
     707              : 
     708              :         /* We have to start the pending read before we can build another. */
     709      2102480 :         while (stream->pending_read_nblocks > 0)
     710              :         {
     711           63 :             if (!read_stream_start_pending_read(stream) ||
     712           63 :                 stream->ios_in_progress == stream->max_ios)
     713              :             {
     714              :                 /* We've hit the buffer or I/O limit.  Rewind and stop here. */
     715            0 :                 read_stream_unget_block(stream, blocknum);
     716            0 :                 if (stream->batch_mode)
     717            0 :                     pgaio_exit_batchmode();
     718            0 :                 return;
     719              :             }
     720              :         }
     721              : 
     722              :         /* This is the start of a new pending read. */
     723      2102417 :         stream->pending_read_blocknum = blocknum;
     724      2102417 :         stream->pending_read_nblocks = 1;
     725              :     }
     726              : 
     727              :     /*
     728              :      * Check if the pending read should be issued now, or if we should give it
     729              :      * another chance to grow to the full size.
     730              :      *
     731              :      * Note that the pending read can exceed the distance goal, if the latter
     732              :      * was reduced after hitting the per-backend buffer limit.
     733              :      */
     734      3803008 :     if (read_stream_should_issue_now(stream))
     735      2167200 :         read_stream_start_pending_read(stream);
     736              : 
     737              :     /*
     738              :      * There should always be something pinned when we leave this function,
     739              :      * whether started by this call or not, unless we've hit the end of the
     740              :      * stream.  In the worst case we can always make progress one buffer at a
     741              :      * time.
     742              :      */
     743              :     Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
     744              : 
     745      3803000 :     if (stream->batch_mode)
     746      3210759 :         pgaio_exit_batchmode();
     747              : }
     748              : 
     749              : /*
     750              :  * Create a new read stream object that can be used to perform the equivalent
     751              :  * of a series of ReadBuffer() calls for one fork of one relation.
     752              :  * Internally, it generates larger vectored reads where possible by looking
     753              :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     754              :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     755              :  * write extra data for each block into the space provided to it.  It will
     756              :  * also receive callback_private_data for its own purposes.
     757              :  */
     758              : static ReadStream *
     759       697579 : read_stream_begin_impl(int flags,
     760              :                        BufferAccessStrategy strategy,
     761              :                        Relation rel,
     762              :                        SMgrRelation smgr,
     763              :                        char persistence,
     764              :                        ForkNumber forknum,
     765              :                        ReadStreamBlockNumberCB callback,
     766              :                        void *callback_private_data,
     767              :                        size_t per_buffer_data_size)
     768              : {
     769              :     ReadStream *stream;
     770              :     size_t      size;
     771              :     int16       queue_size;
     772              :     int16       queue_overflow;
     773              :     int         max_ios;
     774              :     int         strategy_pin_limit;
     775              :     uint32      max_pinned_buffers;
     776              :     uint32      max_possible_buffer_limit;
     777              :     Oid         tablespace_id;
     778              : 
     779              :     /*
     780              :      * Decide how many I/Os we will allow to run at the same time.  This
     781              :      * number also affects how far we look ahead for opportunities to start
     782              :      * more I/Os.
     783              :      */
     784       697579 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     785       697579 :     if (!OidIsValid(MyDatabaseId) ||
     786       813658 :         (rel && IsCatalogRelation(rel)) ||
     787       197158 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     788              :     {
     789              :         /*
     790              :          * Avoid circularity while trying to look up tablespace settings or
     791              :          * before spccache.c is ready.
     792              :          */
     793       566138 :         max_ios = effective_io_concurrency;
     794              :     }
     795       131441 :     else if (flags & READ_STREAM_MAINTENANCE)
     796        18340 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     797              :     else
     798       113101 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     799              : 
     800              :     /* Cap to INT16_MAX to avoid overflowing below */
     801       697579 :     max_ios = Min(max_ios, PG_INT16_MAX);
     802              : 
     803              :     /*
     804              :      * If starting a multi-block I/O near the end of the queue, we might
     805              :      * temporarily need extra space for overflowing buffers before they are
     806              :      * moved to regular circular position.  This is the maximum extra space we
     807              :      * could need.
     808              :      */
     809       697579 :     queue_overflow = io_combine_limit - 1;
     810              : 
     811              :     /*
     812              :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     813              :      * pin fewer if we can, though.  We add one so that we can make progress
     814              :      * even if max_ios is set to 0 (see also further down).  For max_ios > 0,
     815              :      * this also allows an extra full I/O's worth of buffers: after an I/O
     816              :      * finishes we don't want to have to wait for its buffers to be consumed
     817              :      * before starting a new one.
     818              :      *
     819              :      * Be careful not to allow int16 to overflow.  That is possible with the
     820              :      * current GUC range limits, so this is an artificial limit of ~32k
     821              :      * buffers and we'd need to adjust the types to exceed that.  We also have
     822              :      * to allow for the spare entry and the overflow space.
     823              :      */
     824       697579 :     max_pinned_buffers = (max_ios + 1) * io_combine_limit;
     825       697579 :     max_pinned_buffers = Min(max_pinned_buffers,
     826              :                              PG_INT16_MAX - queue_overflow - 1);
     827              : 
     828              :     /* Give the strategy a chance to limit the number of buffers we pin. */
     829       697579 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     830       697579 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     831              : 
     832              :     /*
     833              :      * Also limit our queue to the maximum number of pins we could ever be
     834              :      * allowed to acquire according to the buffer manager.  We may not really
     835              :      * be able to use them all due to other pins held by this backend, but
     836              :      * we'll check that later in read_stream_start_pending_read().
     837              :      */
     838       697579 :     if (SmgrIsTemp(smgr))
     839         9281 :         max_possible_buffer_limit = GetLocalPinLimit();
     840              :     else
     841       688298 :         max_possible_buffer_limit = GetPinLimit();
     842       697579 :     max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
     843              : 
     844              :     /*
     845              :      * The limit might be zero on a system configured with too few buffers for
     846              :      * the number of connections.  We need at least one to make progress.
     847              :      */
     848       697579 :     max_pinned_buffers = Max(1, max_pinned_buffers);
     849              : 
     850              :     /*
     851              :      * We need one extra entry for buffers and per-buffer data, because users
     852              :      * of per-buffer data have access to the object until the next call to
     853              :      * read_stream_next_buffer(), so we need a gap between the head and tail
     854              :      * of the queue so that we don't clobber it.
     855              :      */
     856       697579 :     queue_size = max_pinned_buffers + 1;
     857              : 
     858              :     /*
     859              :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     860              :      * one big chunk.  Though we have queue_size buffers, we want to be able
     861              :      * to assume that all the buffers for a single read are contiguous (i.e.
     862              :      * don't wrap around halfway through), so we allow temporary overflows of
     863              :      * up to the maximum possible overflow size.
     864              :      */
     865       697579 :     size = offsetof(ReadStream, buffers);
     866       697579 :     size += sizeof(Buffer) * (queue_size + queue_overflow);
     867       697579 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     868       697579 :     size += per_buffer_data_size * queue_size;
     869       697579 :     size += MAXIMUM_ALIGNOF * 2;
     870       697579 :     stream = (ReadStream *) palloc(size);
     871       697579 :     memset(stream, 0, offsetof(ReadStream, buffers));
     872       697579 :     stream->ios = (InProgressIO *)
     873       697579 :         MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
     874       697579 :     if (per_buffer_data_size > 0)
     875       143398 :         stream->per_buffer_data = (void *)
     876       143398 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     877              : 
     878       697579 :     stream->sync_mode = io_method == IOMETHOD_SYNC;
     879       697579 :     stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
     880              : 
     881              : #ifdef USE_PREFETCH
     882              : 
     883              :     /*
     884              :      * Read-ahead advice simulating asynchronous I/O with synchronous calls.
     885              :      * Issue advice only if AIO is not used, direct I/O isn't enabled, the
     886              :      * caller hasn't promised sequential access (overriding our detection
     887              :      * heuristics), and max_ios hasn't been set to zero.
     888              :      */
     889       697579 :     if (stream->sync_mode &&
     890         3161 :         (io_direct_flags & IO_DIRECT_DATA) == 0 &&
     891         3161 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     892              :         max_ios > 0)
     893          738 :         stream->advice_enabled = true;
     894              : #endif
     895              : 
     896              :     /*
     897              :      * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
     898              :      * we still need to allocate space to combine and run one I/O.  Bump it up
     899              :      * to one, and remember to ask for synchronous I/O only.
     900              :      */
     901       697579 :     if (max_ios == 0)
     902              :     {
     903            7 :         max_ios = 1;
     904            7 :         stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
     905              :     }
     906              : 
     907              :     /*
     908              :      * Capture stable values for these two GUC-derived numbers for the
     909              :      * lifetime of this stream, so we don't have to worry about the GUCs
     910              :      * changing underneath us beyond this point.
     911              :      */
     912       697579 :     stream->max_ios = max_ios;
     913       697579 :     stream->io_combine_limit = io_combine_limit;
     914              : 
     915       697579 :     stream->per_buffer_data_size = per_buffer_data_size;
     916       697579 :     stream->max_pinned_buffers = max_pinned_buffers;
     917       697579 :     stream->queue_size = queue_size;
     918       697579 :     stream->callback = callback;
     919       697579 :     stream->callback_private_data = callback_private_data;
     920       697579 :     stream->buffered_blocknum = InvalidBlockNumber;
     921       697579 :     stream->seq_blocknum = InvalidBlockNumber;
     922       697579 :     stream->seq_until_processed = InvalidBlockNumber;
     923       697579 :     stream->temporary = SmgrIsTemp(smgr);
     924       697579 :     stream->distance_decay_holdoff = 0;
     925              : 
     926              :     /*
     927              :      * Skip the initial ramp-up phase if the caller says we're going to be
     928              :      * reading the whole relation.  This way we start out assuming we'll be
     929              :      * doing full io_combine_limit sized reads.
     930              :      */
     931       697579 :     if (flags & READ_STREAM_FULL)
     932              :     {
     933        77005 :         stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
     934        77005 :         stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
     935              :     }
     936              :     else
     937              :     {
     938       620574 :         stream->readahead_distance = 1;
     939       620574 :         stream->combine_distance = 1;
     940              :     }
     941       697579 :     stream->resume_readahead_distance = stream->readahead_distance;
     942       697579 :     stream->resume_combine_distance = stream->combine_distance;
     943              : 
     944              :     /*
     945              :      * Since we always access the same relation, we can initialize parts of
     946              :      * the ReadBuffersOperation objects and leave them that way, to avoid
     947              :      * wasting CPU cycles writing to them for each read.
     948              :      */
     949     11887400 :     for (int i = 0; i < max_ios; ++i)
     950              :     {
     951     11189821 :         stream->ios[i].op.rel = rel;
     952     11189821 :         stream->ios[i].op.smgr = smgr;
     953     11189821 :         stream->ios[i].op.persistence = persistence;
     954     11189821 :         stream->ios[i].op.forknum = forknum;
     955     11189821 :         stream->ios[i].op.strategy = strategy;
     956              :     }
     957              : 
     958       697579 :     return stream;
     959              : }
     960              : 
     961              : /*
     962              :  * Create a new read stream for reading a relation.
     963              :  * See read_stream_begin_impl() for the detailed explanation.
     964              :  */
     965              : ReadStream *
     966       627361 : read_stream_begin_relation(int flags,
     967              :                            BufferAccessStrategy strategy,
     968              :                            Relation rel,
     969              :                            ForkNumber forknum,
     970              :                            ReadStreamBlockNumberCB callback,
     971              :                            void *callback_private_data,
     972              :                            size_t per_buffer_data_size)
     973              : {
     974       627361 :     return read_stream_begin_impl(flags,
     975              :                                   strategy,
     976              :                                   rel,
     977              :                                   RelationGetSmgr(rel),
     978       627361 :                                   rel->rd_rel->relpersistence,
     979              :                                   forknum,
     980              :                                   callback,
     981              :                                   callback_private_data,
     982              :                                   per_buffer_data_size);
     983              : }
     984              : 
     985              : /*
     986              :  * Create a new read stream for reading a SMgr relation.
     987              :  * See read_stream_begin_impl() for the detailed explanation.
     988              :  */
     989              : ReadStream *
     990        70218 : read_stream_begin_smgr_relation(int flags,
     991              :                                 BufferAccessStrategy strategy,
     992              :                                 SMgrRelation smgr,
     993              :                                 char smgr_persistence,
     994              :                                 ForkNumber forknum,
     995              :                                 ReadStreamBlockNumberCB callback,
     996              :                                 void *callback_private_data,
     997              :                                 size_t per_buffer_data_size)
     998              : {
     999        70218 :     return read_stream_begin_impl(flags,
    1000              :                                   strategy,
    1001              :                                   NULL,
    1002              :                                   smgr,
    1003              :                                   smgr_persistence,
    1004              :                                   forknum,
    1005              :                                   callback,
    1006              :                                   callback_private_data,
    1007              :                                   per_buffer_data_size);
    1008              : }
    1009              : 
    1010              : /*
    1011              :  * Pull one pinned buffer out of a stream.  Each call returns successive
    1012              :  * blocks in the order specified by the callback.  If per_buffer_data_size was
    1013              :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
    1014              :  * per-buffer data that the callback had a chance to populate, which remains
    1015              :  * valid until the next call to read_stream_next_buffer().  When the stream
    1016              :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
    1017              :  * the stream early at any time by calling read_stream_end().
    1018              :  */
    1019              : Buffer
    1020      8021640 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
    1021              : {
    1022              :     Buffer      buffer;
    1023              :     int16       oldest_buffer_index;
    1024              : 
    1025              : #ifndef READ_STREAM_DISABLE_FAST_PATH
    1026              : 
    1027              :     /*
    1028              :      * A fast path for all-cached scans.  This is the same as the usual
    1029              :      * algorithm, but it is specialized for no I/O and no per-buffer data, so
    1030              :      * we can skip the queue management code, stay in the same buffer slot and
    1031              :      * use singular StartReadBuffer().
    1032              :      */
    1033      8021640 :     if (likely(stream->fast_path))
    1034              :     {
    1035              :         BlockNumber next_blocknum;
    1036              : 
    1037              :         /* Fast path assumptions. */
    1038              :         Assert(stream->ios_in_progress == 0);
    1039              :         Assert(stream->forwarded_buffers == 0);
    1040              :         Assert(stream->pinned_buffers == 1);
    1041              :         Assert(stream->readahead_distance == 1);
    1042              :         Assert(stream->combine_distance == 1);
    1043              :         Assert(stream->pending_read_nblocks == 0);
    1044              :         Assert(stream->per_buffer_data_size == 0);
    1045              :         Assert(stream->initialized_buffers > stream->oldest_buffer_index);
    1046              : 
    1047              :         /* We're going to return the buffer we pinned last time. */
    1048      2930899 :         oldest_buffer_index = stream->oldest_buffer_index;
    1049              :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
    1050              :                stream->next_buffer_index);
    1051      2930899 :         buffer = stream->buffers[oldest_buffer_index];
    1052              :         Assert(buffer != InvalidBuffer);
    1053              : 
    1054              :         /* Choose the next block to pin. */
    1055      2930899 :         next_blocknum = read_stream_get_block(stream, NULL);
    1056              : 
    1057      2930899 :         if (likely(next_blocknum != InvalidBlockNumber))
    1058              :         {
    1059      2822710 :             int         flags = stream->read_buffers_flags;
    1060              : 
    1061      2822710 :             if (stream->advice_enabled)
    1062          548 :                 flags |= READ_BUFFERS_ISSUE_ADVICE;
    1063              : 
    1064              :             /*
    1065              :              * While in fast-path, execute any IO that we might encounter
    1066              :              * synchronously. Because we are, right now, only looking one
    1067              :              * block ahead, dispatching any occasional IO to workers would
    1068              :              * have the overhead of dispatching to workers, without any
    1069              :              * realistic chance of the IO completing before we need it. We
    1070              :              * will switch to non-synchronous IO after this.
    1071              :              *
    1072              :              * Arguably we should do so only for worker, as there's far less
    1073              :              * dispatch overhead with io_uring. However, tests so far have not
    1074              :              * shown a clear downside and additional io_method awareness here
    1075              :              * seems not great from an abstraction POV.
    1076              :              */
    1077      2822710 :             flags |= READ_BUFFERS_SYNCHRONOUSLY;
    1078              : 
    1079              :             /*
    1080              :              * Pin a buffer for the next call.  Same buffer entry, and
    1081              :              * arbitrary I/O entry (they're all free).  We don't have to
    1082              :              * adjust pinned_buffers because we're transferring one to caller
    1083              :              * but pinning one more.
    1084              :              *
    1085              :              * In the fast path we don't need to check the pin limit.  We're
    1086              :              * always allowed at least one pin so that progress can be made,
    1087              :              * and that's all we need here.  Although two pins are momentarily
    1088              :              * held at the same time, the model used here is that the stream
    1089              :              * holds only one, and the other now belongs to the caller.
    1090              :              */
    1091      2822710 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
    1092              :                                         &stream->buffers[oldest_buffer_index],
    1093              :                                         next_blocknum,
    1094              :                                         flags)))
    1095              :             {
    1096              :                 /* Fast return. */
    1097      2806546 :                 read_stream_count_prefetch(stream);
    1098      2806546 :                 return buffer;
    1099              :             }
    1100              : 
    1101              :             /* Next call must wait for I/O for the newly pinned buffer. */
    1102        16164 :             stream->oldest_io_index = 0;
    1103        16164 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
    1104        16164 :             stream->ios_in_progress = 1;
    1105        16164 :             stream->ios[0].buffer_index = oldest_buffer_index;
    1106        16164 :             stream->seq_blocknum = next_blocknum + 1;
    1107              : 
    1108              :             /*
    1109              :              * XXX: It might be worth triggering additional read-ahead here,
    1110              :              * to avoid having to effectively do another synchronous IO for
    1111              :              * the next block (if it were also a miss).
    1112              :              */
    1113              : 
    1114              :             /* update I/O stats */
    1115        16164 :             read_stream_count_io(stream, 1, stream->ios_in_progress);
    1116              : 
    1117              :             /* update prefetch distance */
    1118        16164 :             read_stream_count_prefetch(stream);
    1119              :         }
    1120              :         else
    1121              :         {
    1122              :             /* No more blocks, end of stream. */
    1123       108189 :             stream->readahead_distance = 0;
    1124       108189 :             stream->combine_distance = 0;
    1125       108189 :             stream->oldest_buffer_index = stream->next_buffer_index;
    1126       108189 :             stream->pinned_buffers = 0;
    1127       108189 :             stream->buffers[oldest_buffer_index] = InvalidBuffer;
    1128              :         }
    1129              : 
    1130       124353 :         stream->fast_path = false;
    1131       124353 :         return buffer;
    1132              :     }
    1133              : #endif
    1134              : 
    1135      5090741 :     if (unlikely(stream->pinned_buffers == 0))
    1136              :     {
    1137              :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
    1138              : 
    1139              :         /* End of stream reached?  */
    1140      3554180 :         if (stream->readahead_distance == 0)
    1141      1996443 :             return InvalidBuffer;
    1142              : 
    1143              :         /*
    1144              :          * The usual order of operations is that we look ahead at the bottom
    1145              :          * of this function after potentially finishing an I/O and making
    1146              :          * space for more, but if we're just starting up we'll need to crank
    1147              :          * the handle to get started.
    1148              :          */
    1149      1557737 :         read_stream_look_ahead(stream);
    1150              : 
    1151              :         /* End of stream reached? */
    1152      1557737 :         if (stream->pinned_buffers == 0)
    1153              :         {
    1154              :             Assert(stream->readahead_distance == 0);
    1155       849004 :             return InvalidBuffer;
    1156              :         }
    1157              :     }
    1158              : 
    1159              :     /* Grab the oldest pinned buffer and associated per-buffer data. */
    1160              :     Assert(stream->pinned_buffers > 0);
    1161      2245294 :     oldest_buffer_index = stream->oldest_buffer_index;
    1162              :     Assert(oldest_buffer_index >= 0 &&
    1163              :            oldest_buffer_index < stream->queue_size);
    1164      2245294 :     buffer = stream->buffers[oldest_buffer_index];
    1165      2245294 :     if (per_buffer_data)
    1166       724393 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
    1167              : 
    1168              :     Assert(BufferIsValid(buffer));
    1169              : 
    1170              :     /* Do we have to wait for an associated I/O first? */
    1171      2245294 :     if (stream->ios_in_progress > 0 &&
    1172       766566 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
    1173              :     {
    1174       703072 :         int16       io_index = stream->oldest_io_index;
    1175              :         bool        needed_wait;
    1176              : 
    1177              :         /* Sanity check that we still agree on the buffers. */
    1178              :         Assert(stream->ios[io_index].op.buffers ==
    1179              :                &stream->buffers[oldest_buffer_index]);
    1180              : 
    1181       703072 :         needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
    1182              : 
    1183              :         Assert(stream->ios_in_progress > 0);
    1184       703049 :         stream->ios_in_progress--;
    1185       703049 :         if (++stream->oldest_io_index == stream->max_ios)
    1186        29284 :             stream->oldest_io_index = 0;
    1187              : 
    1188              :         /*
    1189              :          * If the IO was executed synchronously, we will never see
    1190              :          * WaitReadBuffers() block. Treat it as if it did block. This is
    1191              :          * particularly crucial when effective_io_concurrency=0 is used, as
    1192              :          * all IO will be synchronous.  Without treating synchronous IO as
    1193              :          * having waited, we'd never allow the distance to get large enough to
    1194              :          * allow for IO combining, resulting in bad performance.
    1195              :          */
    1196       703049 :         if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY)
    1197        16688 :             needed_wait = true;
    1198              : 
    1199              :         /* Count it as a wait if we need to wait for IO */
    1200       703049 :         if (needed_wait)
    1201       347149 :             read_stream_count_wait(stream);
    1202              : 
    1203              :         /*
    1204              :          * Have the read-ahead distance ramp up rapidly after we needed to
    1205              :          * wait for IO. We only increase the read-ahead-distance when we
    1206              :          * needed to wait, to avoid increasing the distance further than
    1207              :          * necessary, as looking ahead too far can be costly, both due to the
    1208              :          * cost of unnecessarily pinning many buffers and due to doing IOs
    1209              :          * that may never be consumed if the stream is ended/reset before
    1210              :          * completion.
    1211              :          *
    1212              :          * If we did not need to wait, the current distance was evidently
    1213              :          * sufficient.
    1214              :          *
    1215              :          * NB: Must not increase the distance if we already reached the end of
    1216              :          * the stream, as stream->readahead_distance == 0 is used to keep
    1217              :          * track of having reached the end.
    1218              :          */
    1219       703049 :         if (stream->readahead_distance > 0 && needed_wait)
    1220              :         {
    1221              :             /* wider temporary value, due to overflow risk */
    1222              :             int32       readahead_distance;
    1223              : 
    1224       318010 :             readahead_distance = stream->readahead_distance * 2;
    1225       318010 :             readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
    1226       318010 :             stream->readahead_distance = readahead_distance;
    1227              :         }
    1228              : 
    1229              :         /*
    1230              :          * As we needed IO, prevent distances from being reduced within our
    1231              :          * maximum look-ahead window. This avoids collapsing distances too
    1232              :          * quickly in workloads where most of the required blocks are cached,
    1233              :          * but where the remaining IOs are a sufficient enough factor to cause
    1234              :          * a substantial slowdown if executed synchronously.
    1235              :          *
    1236              :          * There are valid arguments for preventing decay for max_ios or for
    1237              :          * max_pinned_buffers.  But the argument for max_pinned_buffers seems
    1238              :          * clearer - if we can't see any misses within the maximum look-ahead
    1239              :          * distance, we can't do any useful read-ahead.
    1240              :          */
    1241       703049 :         stream->distance_decay_holdoff = stream->max_pinned_buffers;
    1242              : 
    1243              :         /*
    1244              :          * Whether we needed to wait or not, allow for more IO combining if we
    1245              :          * needed to do IO. The reason to do so independent of needing to wait
    1246              :          * is that when the data is resident in the kernel page cache, IO
    1247              :          * combining reduces the syscall / dispatch overhead, making it
    1248              :          * worthwhile regardless of needing to wait.
    1249              :          *
    1250              :          * It is also important with io_uring as it will never signal the need
    1251              :          * to wait for reads if all the data is in the page cache. There are
    1252              :          * heuristics to deal with that in method_io_uring.c, but they only
    1253              :          * work when the IO gets large enough.
    1254              :          */
    1255       703049 :         if (stream->combine_distance > 0 &&
    1256       652394 :             stream->combine_distance < stream->io_combine_limit)
    1257              :         {
    1258              :             /* wider temporary value, due to overflow risk */
    1259              :             int32       combine_distance;
    1260              : 
    1261       643946 :             combine_distance = stream->combine_distance * 2;
    1262       643946 :             combine_distance = Min(combine_distance, stream->io_combine_limit);
    1263       643946 :             combine_distance = Min(combine_distance, stream->max_pinned_buffers);
    1264       643946 :             stream->combine_distance = combine_distance;
    1265              :         }
    1266              : 
    1267              :         /*
    1268              :          * If we've reached the first block of a sequential region we're
    1269              :          * issuing advice for, cancel that until the next jump.  The kernel
    1270              :          * will see the sequential preadv() pattern starting here.
    1271              :          */
    1272       703049 :         if (stream->advice_enabled &&
    1273          318 :             stream->ios[io_index].op.blocknum == stream->seq_until_processed)
    1274          275 :             stream->seq_until_processed = InvalidBlockNumber;
    1275              :     }
    1276              : 
    1277              :     /*
    1278              :      * We must zap this queue entry, or else it would appear as a forwarded
    1279              :      * buffer.  If it's potentially in the overflow zone (ie from a
    1280              :      * multi-block I/O that wrapped around the queue), also zap the copy.
    1281              :      */
    1282      2245271 :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
    1283      2245271 :     if (oldest_buffer_index < stream->io_combine_limit - 1)
    1284      1687114 :         stream->buffers[stream->queue_size + oldest_buffer_index] =
    1285              :             InvalidBuffer;
    1286              : 
    1287              : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
    1288              : 
    1289              :     /*
    1290              :      * The caller will get access to the per-buffer data, until the next call.
    1291              :      * We wipe the one before, which is never occupied because queue_size
    1292              :      * allowed one extra element.  This will hopefully trip up client code
    1293              :      * that is holding a dangling pointer to it.
    1294              :      */
    1295              :     if (stream->per_buffer_data)
    1296              :     {
    1297              :         void       *per_buffer_data;
    1298              : 
    1299              :         per_buffer_data = get_per_buffer_data(stream,
    1300              :                                               oldest_buffer_index == 0 ?
    1301              :                                               stream->queue_size - 1 :
    1302              :                                               oldest_buffer_index - 1);
    1303              : 
    1304              : #if defined(CLOBBER_FREED_MEMORY)
    1305              :         /* This also tells Valgrind the memory is "noaccess". */
    1306              :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
    1307              : #elif defined(USE_VALGRIND)
    1308              :         /* Tell it ourselves. */
    1309              :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
    1310              :                                    stream->per_buffer_data_size);
    1311              : #endif
    1312              :     }
    1313              : #endif
    1314              : 
    1315      2245271 :     read_stream_count_prefetch(stream);
    1316              : 
    1317              :     /* Pin transferred to caller. */
    1318              :     Assert(stream->pinned_buffers > 0);
    1319      2245271 :     stream->pinned_buffers--;
    1320              : 
    1321              :     /* Advance oldest buffer, with wrap-around. */
    1322      2245271 :     stream->oldest_buffer_index++;
    1323      2245271 :     if (stream->oldest_buffer_index == stream->queue_size)
    1324       340922 :         stream->oldest_buffer_index = 0;
    1325              : 
    1326              :     /* Prepare for the next call. */
    1327      2245271 :     read_stream_look_ahead(stream);
    1328              : 
    1329              : #ifndef READ_STREAM_DISABLE_FAST_PATH
    1330              :     /* See if we can take the fast path for all-cached scans next time. */
    1331      2245263 :     if (stream->ios_in_progress == 0 &&
    1332      1576250 :         stream->forwarded_buffers == 0 &&
    1333      1573503 :         stream->pinned_buffers == 1 &&
    1334       880966 :         stream->readahead_distance == 1 &&
    1335       796759 :         stream->combine_distance == 1 &&
    1336       794197 :         stream->pending_read_nblocks == 0 &&
    1337       793047 :         stream->per_buffer_data_size == 0)
    1338              :     {
    1339              :         /*
    1340              :          * The fast path spins on one buffer entry repeatedly instead of
    1341              :          * rotating through the whole queue and clearing the entries behind
    1342              :          * it.  If the buffer it starts with happened to be forwarded between
    1343              :          * StartReadBuffers() calls and also wrapped around the circular queue
    1344              :          * partway through, then a copy also exists in the overflow zone, and
    1345              :          * it won't clear it out as the regular path would.  Do that now, so
    1346              :          * it doesn't need code for that.
    1347              :          */
    1348       237198 :         if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
    1349       235499 :             stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
    1350              :                 InvalidBuffer;
    1351              : 
    1352       237198 :         stream->fast_path = true;
    1353              :     }
    1354              : #endif
    1355              : 
    1356      2245263 :     return buffer;
    1357              : }
    1358              : 
    1359              : /*
    1360              :  * Transitional support for code that would like to perform or skip reads
    1361              :  * itself, without using the stream.  Returns, and consumes, the next block
    1362              :  * number that would be read by the stream's look-ahead algorithm, or
    1363              :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
    1364              :  * strategy that would be used to read it.
    1365              :  */
    1366              : BlockNumber
    1367            0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
    1368              : {
    1369            0 :     *strategy = stream->ios[0].op.strategy;
    1370            0 :     return read_stream_get_block(stream, NULL);
    1371              : }
    1372              : 
    1373              : /*
    1374              :  * Temporarily stop consuming block numbers from the block number callback.
    1375              :  * If called inside the block number callback, its return value should be
    1376              :  * returned by the callback.
    1377              :  */
    1378              : BlockNumber
    1379            0 : read_stream_pause(ReadStream *stream)
    1380              : {
    1381            0 :     stream->resume_readahead_distance = stream->readahead_distance;
    1382            0 :     stream->resume_combine_distance = stream->combine_distance;
    1383            0 :     stream->readahead_distance = 0;
    1384            0 :     stream->combine_distance = 0;
    1385            0 :     return InvalidBlockNumber;
    1386              : }
    1387              : 
    1388              : /*
    1389              :  * Resume looking ahead after the block number callback reported
    1390              :  * end-of-stream. This is useful for streams of self-referential blocks, after
    1391              :  * a buffer needed to be consumed and examined to find more block numbers.
    1392              :  */
    1393              : void
    1394            0 : read_stream_resume(ReadStream *stream)
    1395              : {
    1396            0 :     stream->readahead_distance = stream->resume_readahead_distance;
    1397            0 :     stream->combine_distance = stream->resume_combine_distance;
    1398            0 : }
    1399              : 
    1400              : /*
    1401              :  * Reset a read stream by releasing any queued up buffers, allowing the stream
    1402              :  * to be used again for different blocks.  This can be used to clear an
    1403              :  * end-of-stream condition and start again, or to throw away blocks that were
    1404              :  * speculatively read and read some different blocks instead.
    1405              :  */
    1406              : void
    1407      1559193 : read_stream_reset(ReadStream *stream)
    1408              : {
    1409              :     int16       index;
    1410              :     Buffer      buffer;
    1411              : 
    1412              :     /* Stop looking ahead. */
    1413      1559193 :     stream->readahead_distance = 0;
    1414      1559193 :     stream->combine_distance = 0;
    1415              : 
    1416              :     /* Forget buffered block number and fast path state. */
    1417      1559193 :     stream->buffered_blocknum = InvalidBlockNumber;
    1418      1559193 :     stream->fast_path = false;
    1419              : 
    1420              :     /* Unpin anything that wasn't consumed. */
    1421      1700248 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    1422       141055 :         ReleaseBuffer(buffer);
    1423              : 
    1424              :     /* Unpin any unused forwarded buffers. */
    1425      1559193 :     index = stream->next_buffer_index;
    1426      1559193 :     while (index < stream->initialized_buffers &&
    1427       216549 :            (buffer = stream->buffers[index]) != InvalidBuffer)
    1428              :     {
    1429              :         Assert(stream->forwarded_buffers > 0);
    1430            0 :         stream->forwarded_buffers--;
    1431            0 :         ReleaseBuffer(buffer);
    1432              : 
    1433            0 :         stream->buffers[index] = InvalidBuffer;
    1434            0 :         if (index < stream->io_combine_limit - 1)
    1435            0 :             stream->buffers[stream->queue_size + index] = InvalidBuffer;
    1436              : 
    1437            0 :         if (++index == stream->queue_size)
    1438            0 :             index = 0;
    1439              :     }
    1440              : 
    1441              :     Assert(stream->forwarded_buffers == 0);
    1442              :     Assert(stream->pinned_buffers == 0);
    1443              :     Assert(stream->ios_in_progress == 0);
    1444              : 
    1445              :     /* Start off assuming data is cached. */
    1446      1559193 :     stream->readahead_distance = 1;
    1447      1559193 :     stream->combine_distance = 1;
    1448      1559193 :     stream->resume_readahead_distance = stream->readahead_distance;
    1449      1559193 :     stream->resume_combine_distance = stream->combine_distance;
    1450      1559193 :     stream->distance_decay_holdoff = 0;
    1451      1559193 : }
    1452              : 
    1453              : /*
    1454              :  * Release and free a read stream.
    1455              :  */
    1456              : void
    1457       694353 : read_stream_end(ReadStream *stream)
    1458              : {
    1459       694353 :     read_stream_reset(stream);
    1460       694353 :     pfree(stream);
    1461       694353 : }
        

Generated by: LCOV version 2.0-1