LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 193 213 90.6 %
Date: 2024-05-17 08:10:52 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * read_stream.c
       4             :  *    Mechanism for accessing buffered relation data with look-ahead
       5             :  *
       6             :  * Code that needs to access relation data typically pins blocks one at a
       7             :  * time, often in a predictable order that might be sequential or data-driven.
       8             :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9             :  * because blocks that are not yet in the buffer pool require I/O operations
      10             :  * that are small and might stall waiting for storage.  This mechanism looks
      11             :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12             :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13             :  * distance.
      14             :  *
      15             :  * A user-provided callback generates a stream of block numbers that is used
      16             :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17             :  * pending read.  When that isn't possible, the existing pending read is sent
      18             :  * to StartReadBuffers() so that a new one can begin to form.
      19             :  *
      20             :  * The algorithm for controlling the look-ahead distance tries to classify the
      21             :  * stream into three ideal behaviors:
      22             :  *
      23             :  * A) No I/O is necessary, because the requested blocks are fully cached
      24             :  * already.  There is no benefit to looking ahead more than one block, so
      25             :  * distance is 1.  This is the default initial assumption.
      26             :  *
      27             :  * B) I/O is necessary, but fadvise is undesirable because the access is
      28             :  * sequential, or impossible because direct I/O is enabled or the system
      29             :  * doesn't support fadvise.  There is no benefit in looking ahead more than
      30             :  * io_combine_limit, because in this case the only goal is larger read system
      31             :  * calls.  Looking further ahead would pin many buffers and perform
      32             :  * speculative work looking ahead for no benefit.
      33             :  *
      34             :  * C) I/O is necessary, it appears random, and this system supports fadvise.
      35             :  * We'll look further ahead in order to reach the configured level of I/O
      36             :  * concurrency.
      37             :  *
      38             :  * The distance increases rapidly and decays slowly, so that it moves towards
      39             :  * those levels as different I/O patterns are discovered.  For example, a
      40             :  * sequential scan of fully cached data doesn't bother looking ahead, but a
      41             :  * sequential scan that hits a region of uncached blocks will start issuing
      42             :  * increasingly wide read calls until it plateaus at io_combine_limit.
      43             :  *
      44             :  * The main data structure is a circular queue of buffers of size
      45             :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      46             :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      47             :  * variable sized object that is passed from the callback to the consumer of
      48             :  * buffers.
      49             :  *
      50             :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      51             :  * I/Os that have been started with StartReadBuffers(), and for which
      52             :  * WaitReadBuffers() must be called before returning the buffer.
      53             :  *
      54             :  * For example, if the callback return block numbers 10, 42, 43, 60 in
      55             :  * successive calls, then these data structures might appear as follows:
      56             :  *
      57             :  *                          buffers buf/data       ios
      58             :  *
      59             :  *                          +----+  +-----+       +--------+
      60             :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      61             :  *                          +----+  +-----+  |    +--------+
      62             :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      63             :  *                          +----+  +-----+  | |  +--------+
      64             :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      65             :  *                          +----+  +-----+    |  +--------+
      66             :  *                          | 43 |  |  ?  |    |  |        |
      67             :  *                          +----+  +-----+    |  +--------+
      68             :  *                          | 44 |  |  ?  |    |  |        |
      69             :  *                          +----+  +-----+    |  +--------+
      70             :  *                          | 60 |  |  ?  |<---+
      71             :  *                          +----+  +-----+
      72             :  *     next_buffer_index -> |    |  |     |
      73             :  *                          +----+  +-----+
      74             :  *
      75             :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      76             :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      77             :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      78             :  * does block 60.
      79             :  *
      80             :  *
      81             :  * Portions Copyright (c) 2024, PostgreSQL Global Development Group
      82             :  * Portions Copyright (c) 1994, Regents of the University of California
      83             :  *
      84             :  * IDENTIFICATION
      85             :  *    src/backend/storage/aio/read_stream.c
      86             :  *
      87             :  *-------------------------------------------------------------------------
      88             :  */
      89             : #include "postgres.h"
      90             : 
      91             : #include "catalog/pg_tablespace.h"
      92             : #include "miscadmin.h"
      93             : #include "storage/fd.h"
      94             : #include "storage/smgr.h"
      95             : #include "storage/read_stream.h"
      96             : #include "utils/memdebug.h"
      97             : #include "utils/rel.h"
      98             : #include "utils/spccache.h"
      99             : 
     100             : typedef struct InProgressIO
     101             : {
     102             :     int16       buffer_index;
     103             :     ReadBuffersOperation op;
     104             : } InProgressIO;
     105             : 
     106             : /*
     107             :  * State for managing a stream of reads.
     108             :  */
     109             : struct ReadStream
     110             : {
     111             :     int16       max_ios;
     112             :     int16       ios_in_progress;
     113             :     int16       queue_size;
     114             :     int16       max_pinned_buffers;
     115             :     int16       pinned_buffers;
     116             :     int16       distance;
     117             :     bool        advice_enabled;
     118             : 
     119             :     /*
     120             :      * Small buffer of block numbers, useful for 'ungetting' to resolve flow
     121             :      * control problems when I/Os are split.  Also useful for batch-loading
     122             :      * block numbers in the fast path.
     123             :      */
     124             :     BlockNumber blocknums[16];
     125             :     int16       blocknums_count;
     126             :     int16       blocknums_next;
     127             : 
     128             :     /*
     129             :      * The callback that will tell us which block numbers to read, and an
     130             :      * opaque pointer that will be pass to it for its own purposes.
     131             :      */
     132             :     ReadStreamBlockNumberCB callback;
     133             :     void       *callback_private_data;
     134             : 
     135             :     /* Next expected block, for detecting sequential access. */
     136             :     BlockNumber seq_blocknum;
     137             : 
     138             :     /* The read operation we are currently preparing. */
     139             :     BlockNumber pending_read_blocknum;
     140             :     int16       pending_read_nblocks;
     141             : 
     142             :     /* Space for buffers and optional per-buffer private data. */
     143             :     size_t      per_buffer_data_size;
     144             :     void       *per_buffer_data;
     145             : 
     146             :     /* Read operations that have been started but not waited for yet. */
     147             :     InProgressIO *ios;
     148             :     int16       oldest_io_index;
     149             :     int16       next_io_index;
     150             : 
     151             :     bool        fast_path;
     152             : 
     153             :     /* Circular queue of buffers. */
     154             :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     155             :     int16       next_buffer_index;  /* Index of next buffer to pin */
     156             :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     157             : };
     158             : 
     159             : /*
     160             :  * Return a pointer to the per-buffer data by index.
     161             :  */
     162             : static inline void *
     163     2915428 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     164             : {
     165     5830856 :     return (char *) stream->per_buffer_data +
     166     2915428 :         stream->per_buffer_data_size * buffer_index;
     167             : }
     168             : 
     169             : /*
     170             :  * Ask the callback which block it would like us to read next, with a small
     171             :  * buffer in front to allow read_stream_unget_block() to work and to allow the
     172             :  * fast path to skip this function and work directly from the array.
     173             :  */
     174             : static inline BlockNumber
     175     2915428 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     176             : {
     177     2915428 :     if (stream->blocknums_next < stream->blocknums_count)
     178       52834 :         return stream->blocknums[stream->blocknums_next++];
     179             : 
     180             :     /*
     181             :      * We only bother to fetch one at a time here (but see the fast path which
     182             :      * uses more).
     183             :      */
     184     2862594 :     return stream->callback(stream,
     185             :                             stream->callback_private_data,
     186             :                             per_buffer_data);
     187             : }
     188             : 
     189             : /*
     190             :  * In order to deal with short reads in StartReadBuffers(), we sometimes need
     191             :  * to defer handling of a block until later.
     192             :  */
     193             : static inline void
     194           0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     195             : {
     196           0 :     if (stream->blocknums_next == stream->blocknums_count)
     197             :     {
     198             :         /* Never initialized or entirely consumed.  Re-initialize. */
     199           0 :         stream->blocknums[0] = blocknum;
     200           0 :         stream->blocknums_count = 1;
     201           0 :         stream->blocknums_next = 0;
     202             :     }
     203             :     else
     204             :     {
     205             :         /* Must be the last value return from blocknums array. */
     206             :         Assert(stream->blocknums_next > 0);
     207           0 :         stream->blocknums_next--;
     208             :         Assert(stream->blocknums[stream->blocknums_next] == blocknum);
     209             :     }
     210           0 : }
     211             : 
     212             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     213             : static void
     214      330462 : read_stream_fill_blocknums(ReadStream *stream)
     215             : {
     216             :     BlockNumber blocknum;
     217      330462 :     int         i = 0;
     218             : 
     219             :     do
     220             :     {
     221     3428654 :         blocknum = stream->callback(stream,
     222             :                                     stream->callback_private_data,
     223             :                                     NULL);
     224     3428654 :         stream->blocknums[i++] = blocknum;
     225     3278840 :     } while (i < lengthof(stream->blocknums) &&
     226     3428654 :              blocknum != InvalidBlockNumber);
     227      330462 :     stream->blocknums_count = i;
     228      330462 :     stream->blocknums_next = 0;
     229      330462 : }
     230             : #endif
     231             : 
     232             : static void
     233     1547722 : read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
     234             : {
     235             :     bool        need_wait;
     236             :     int         nblocks;
     237             :     int         flags;
     238             :     int16       io_index;
     239             :     int16       overflow;
     240             :     int16       buffer_index;
     241             : 
     242             :     /* This should only be called with a pending read. */
     243             :     Assert(stream->pending_read_nblocks > 0);
     244             :     Assert(stream->pending_read_nblocks <= io_combine_limit);
     245             : 
     246             :     /* We had better not exceed the pin limit by starting this read. */
     247             :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     248             :            stream->max_pinned_buffers);
     249             : 
     250             :     /* We had better not be overwriting an existing pinned buffer. */
     251     1547722 :     if (stream->pinned_buffers > 0)
     252             :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     253             :     else
     254             :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     255             : 
     256             :     /*
     257             :      * If advice hasn't been suppressed, this system supports it, and this
     258             :      * isn't a strictly sequential pattern, then we'll issue advice.
     259             :      */
     260     1547722 :     if (!suppress_advice &&
     261      750256 :         stream->advice_enabled &&
     262       26368 :         stream->pending_read_blocknum != stream->seq_blocknum)
     263        3516 :         flags = READ_BUFFERS_ISSUE_ADVICE;
     264             :     else
     265     1544206 :         flags = 0;
     266             : 
     267             :     /* We say how many blocks we want to read, but may be smaller on return. */
     268     1547722 :     buffer_index = stream->next_buffer_index;
     269     1547722 :     io_index = stream->next_io_index;
     270     1547722 :     nblocks = stream->pending_read_nblocks;
     271     1547722 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     272     1547722 :                                  &stream->buffers[buffer_index],
     273             :                                  stream->pending_read_blocknum,
     274             :                                  &nblocks,
     275             :                                  flags);
     276     1547722 :     stream->pinned_buffers += nblocks;
     277             : 
     278             :     /* Remember whether we need to wait before returning this buffer. */
     279     1547722 :     if (!need_wait)
     280             :     {
     281             :         /* Look-ahead distance decays, no I/O necessary (behavior A). */
     282     1036180 :         if (stream->distance > 1)
     283        2250 :             stream->distance--;
     284             :     }
     285             :     else
     286             :     {
     287             :         /*
     288             :          * Remember to call WaitReadBuffers() before returning head buffer.
     289             :          * Look-ahead distance will be adjusted after waiting.
     290             :          */
     291      511542 :         stream->ios[io_index].buffer_index = buffer_index;
     292      511542 :         if (++stream->next_io_index == stream->max_ios)
     293      491832 :             stream->next_io_index = 0;
     294             :         Assert(stream->ios_in_progress < stream->max_ios);
     295      511542 :         stream->ios_in_progress++;
     296      511542 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     297             :     }
     298             : 
     299             :     /*
     300             :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     301             :      * we want it to wrap around at queue_size.  Slide overflowing buffers to
     302             :      * the front of the array.
     303             :      */
     304     1547722 :     overflow = (buffer_index + nblocks) - stream->queue_size;
     305     1547722 :     if (overflow > 0)
     306        3488 :         memmove(&stream->buffers[0],
     307        3488 :                 &stream->buffers[stream->queue_size],
     308             :                 sizeof(stream->buffers[0]) * overflow);
     309             : 
     310             :     /* Compute location of start of next read, without using % operator. */
     311     1547722 :     buffer_index += nblocks;
     312     1547722 :     if (buffer_index >= stream->queue_size)
     313      298932 :         buffer_index -= stream->queue_size;
     314             :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     315     1547722 :     stream->next_buffer_index = buffer_index;
     316             : 
     317             :     /* Adjust the pending read to cover the remaining portion, if any. */
     318     1547722 :     stream->pending_read_blocknum += nblocks;
     319     1547722 :     stream->pending_read_nblocks -= nblocks;
     320     1547722 : }
     321             : 
     322             : static void
     323     3056806 : read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
     324             : {
     325     4679702 :     while (stream->ios_in_progress < stream->max_ios &&
     326     4669308 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     327             :     {
     328             :         BlockNumber blocknum;
     329             :         int16       buffer_index;
     330             :         void       *per_buffer_data;
     331             : 
     332     2915428 :         if (stream->pending_read_nblocks == io_combine_limit)
     333             :         {
     334           0 :             read_stream_start_pending_read(stream, suppress_advice);
     335           0 :             suppress_advice = false;
     336           0 :             continue;
     337             :         }
     338             : 
     339             :         /*
     340             :          * See which block the callback wants next in the stream.  We need to
     341             :          * compute the index of the Nth block of the pending read including
     342             :          * wrap-around, but we don't want to use the expensive % operator.
     343             :          */
     344     2915428 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     345     2915428 :         if (buffer_index >= stream->queue_size)
     346       37622 :             buffer_index -= stream->queue_size;
     347             :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     348     2915428 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     349     2915428 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     350     2915428 :         if (blocknum == InvalidBlockNumber)
     351             :         {
     352             :             /* End of stream. */
     353     1292532 :             stream->distance = 0;
     354     1292532 :             break;
     355             :         }
     356             : 
     357             :         /* Can we merge it with the pending read? */
     358     1622896 :         if (stream->pending_read_nblocks > 0 &&
     359       79540 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     360             :         {
     361       79540 :             stream->pending_read_nblocks++;
     362       79540 :             continue;
     363             :         }
     364             : 
     365             :         /* We have to start the pending read before we can build another. */
     366     1543356 :         while (stream->pending_read_nblocks > 0)
     367             :         {
     368           0 :             read_stream_start_pending_read(stream, suppress_advice);
     369           0 :             suppress_advice = false;
     370           0 :             if (stream->ios_in_progress == stream->max_ios)
     371             :             {
     372             :                 /* And we've hit the limit.  Rewind, and stop here. */
     373           0 :                 read_stream_unget_block(stream, blocknum);
     374           0 :                 return;
     375             :             }
     376             :         }
     377             : 
     378             :         /* This is the start of a new pending read. */
     379     1543356 :         stream->pending_read_blocknum = blocknum;
     380     1543356 :         stream->pending_read_nblocks = 1;
     381             :     }
     382             : 
     383             :     /*
     384             :      * We don't start the pending read just because we've hit the distance
     385             :      * limit, preferring to give it another chance to grow to full
     386             :      * io_combine_limit size once more buffers have been consumed.  However,
     387             :      * if we've already reached io_combine_limit, or we've reached the
     388             :      * distance limit and there isn't anything pinned yet, or the callback has
     389             :      * signaled end-of-stream, we start the read immediately.
     390             :      */
     391     3056806 :     if (stream->pending_read_nblocks > 0 &&
     392     1594670 :         (stream->pending_read_nblocks == io_combine_limit ||
     393     1591640 :          (stream->pending_read_nblocks == stream->distance &&
     394     1539234 :           stream->pinned_buffers == 0) ||
     395       52406 :          stream->distance == 0) &&
     396     1547776 :         stream->ios_in_progress < stream->max_ios)
     397     1547722 :         read_stream_start_pending_read(stream, suppress_advice);
     398             : }
     399             : 
     400             : /*
     401             :  * Create a new read stream object that can be used to perform the equivalent
     402             :  * of a series of ReadBuffer() calls for one fork of one relation.
     403             :  * Internally, it generates larger vectored reads where possible by looking
     404             :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     405             :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     406             :  * write extra data for each block into the space provided to it.  It will
     407             :  * also receive callback_private_data for its own purposes.
     408             :  */
     409             : ReadStream *
     410      584266 : read_stream_begin_relation(int flags,
     411             :                            BufferAccessStrategy strategy,
     412             :                            Relation rel,
     413             :                            ForkNumber forknum,
     414             :                            ReadStreamBlockNumberCB callback,
     415             :                            void *callback_private_data,
     416             :                            size_t per_buffer_data_size)
     417             : {
     418             :     ReadStream *stream;
     419             :     size_t      size;
     420             :     int16       queue_size;
     421             :     int         max_ios;
     422             :     int         strategy_pin_limit;
     423             :     uint32      max_pinned_buffers;
     424             :     Oid         tablespace_id;
     425             :     SMgrRelation smgr;
     426             : 
     427      584266 :     smgr = RelationGetSmgr(rel);
     428             : 
     429             :     /*
     430             :      * Decide how many I/Os we will allow to run at the same time.  That
     431             :      * currently means advice to the kernel to tell it that we will soon read.
     432             :      * This number also affects how far we look ahead for opportunities to
     433             :      * start more I/Os.
     434             :      */
     435      584266 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     436     1157076 :     if (!OidIsValid(MyDatabaseId) ||
     437      708820 :         IsCatalogRelation(rel) ||
     438      136010 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     439             :     {
     440             :         /*
     441             :          * Avoid circularity while trying to look up tablespace settings or
     442             :          * before spccache.c is ready.
     443             :          */
     444      448256 :         max_ios = effective_io_concurrency;
     445             :     }
     446      136010 :     else if (flags & READ_STREAM_MAINTENANCE)
     447        5268 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     448             :     else
     449      130742 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     450             : 
     451             :     /* Cap to INT16_MAX to avoid overflowing below */
     452      584266 :     max_ios = Min(max_ios, PG_INT16_MAX);
     453             : 
     454             :     /*
     455             :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     456             :      * pin fewer if we can, though.  We clamp it to at least io_combine_limit
     457             :      * so that we can have a chance to build up a full io_combine_limit sized
     458             :      * read, even when max_ios is zero.  Be careful not to allow int16 to
     459             :      * overflow (even though that's not possible with the current GUC range
     460             :      * limits), allowing also for the spare entry and the overflow space.
     461             :      */
     462      584266 :     max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
     463      584266 :     max_pinned_buffers = Min(max_pinned_buffers,
     464             :                              PG_INT16_MAX - io_combine_limit - 1);
     465             : 
     466             :     /* Give the strategy a chance to limit the number of buffers we pin. */
     467      584266 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     468      584266 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     469             : 
     470             :     /* Don't allow this backend to pin more than its share of buffers. */
     471      584266 :     if (SmgrIsTemp(smgr))
     472       10724 :         LimitAdditionalLocalPins(&max_pinned_buffers);
     473             :     else
     474      573542 :         LimitAdditionalPins(&max_pinned_buffers);
     475             :     Assert(max_pinned_buffers > 0);
     476             : 
     477             :     /*
     478             :      * We need one extra entry for buffers and per-buffer data, because users
     479             :      * of per-buffer data have access to the object until the next call to
     480             :      * read_stream_next_buffer(), so we need a gap between the head and tail
     481             :      * of the queue so that we don't clobber it.
     482             :      */
     483      584266 :     queue_size = max_pinned_buffers + 1;
     484             : 
     485             :     /*
     486             :      * Allocate the object, the buffers, the ios and per_data_data space in
     487             :      * one big chunk.  Though we have queue_size buffers, we want to be able
     488             :      * to assume that all the buffers for a single read are contiguous (i.e.
     489             :      * don't wrap around halfway through), so we allow temporary overflows of
     490             :      * up to the maximum possible read size by allocating an extra
     491             :      * io_combine_limit - 1 elements.
     492             :      */
     493      584266 :     size = offsetof(ReadStream, buffers);
     494      584266 :     size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
     495      584266 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     496      584266 :     size += per_buffer_data_size * queue_size;
     497      584266 :     size += MAXIMUM_ALIGNOF * 2;
     498      584266 :     stream = (ReadStream *) palloc(size);
     499      584266 :     memset(stream, 0, offsetof(ReadStream, buffers));
     500      584266 :     stream->ios = (InProgressIO *)
     501      584266 :         MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
     502      584266 :     if (per_buffer_data_size > 0)
     503           0 :         stream->per_buffer_data = (void *)
     504           0 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     505             : 
     506             : #ifdef USE_PREFETCH
     507             : 
     508             :     /*
     509             :      * This system supports prefetching advice.  We can use it as long as
     510             :      * direct I/O isn't enabled, the caller hasn't promised sequential access
     511             :      * (overriding our detection heuristics), and max_ios hasn't been set to
     512             :      * zero.
     513             :      */
     514      584266 :     if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
     515      584060 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     516             :         max_ios > 0)
     517       18768 :         stream->advice_enabled = true;
     518             : #endif
     519             : 
     520             :     /*
     521             :      * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
     522             :      * above.  If we had real asynchronous I/O we might need a slightly
     523             :      * different definition.
     524             :      */
     525      584266 :     if (max_ios == 0)
     526           0 :         max_ios = 1;
     527             : 
     528      584266 :     stream->max_ios = max_ios;
     529      584266 :     stream->per_buffer_data_size = per_buffer_data_size;
     530      584266 :     stream->max_pinned_buffers = max_pinned_buffers;
     531      584266 :     stream->queue_size = queue_size;
     532      584266 :     stream->callback = callback;
     533      584266 :     stream->callback_private_data = callback_private_data;
     534             : 
     535             :     /*
     536             :      * Skip the initial ramp-up phase if the caller says we're going to be
     537             :      * reading the whole relation.  This way we start out assuming we'll be
     538             :      * doing full io_combine_limit sized reads (behavior B).
     539             :      */
     540      584266 :     if (flags & READ_STREAM_FULL)
     541        5502 :         stream->distance = Min(max_pinned_buffers, io_combine_limit);
     542             :     else
     543      578764 :         stream->distance = 1;
     544             : 
     545             :     /*
     546             :      * Since we always access the same relation, we can initialize parts of
     547             :      * the ReadBuffersOperation objects and leave them that way, to avoid
     548             :      * wasting CPU cycles writing to them for each read.
     549             :      */
     550     1320314 :     for (int i = 0; i < max_ios; ++i)
     551             :     {
     552      736048 :         stream->ios[i].op.rel = rel;
     553      736048 :         stream->ios[i].op.smgr = RelationGetSmgr(rel);
     554      736048 :         stream->ios[i].op.smgr_persistence = 0;
     555      736048 :         stream->ios[i].op.forknum = forknum;
     556      736048 :         stream->ios[i].op.strategy = strategy;
     557             :     }
     558             : 
     559      584266 :     return stream;
     560             : }
     561             : 
     562             : /*
     563             :  * Pull one pinned buffer out of a stream.  Each call returns successive
     564             :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     565             :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     566             :  * per-buffer data that the callback had a chance to populate, which remains
     567             :  * valid until the next call to read_stream_next_buffer().  When the stream
     568             :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     569             :  * the stream early at any time by calling read_stream_end().
     570             :  */
     571             : Buffer
     572     7069410 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     573             : {
     574             :     Buffer      buffer;
     575             :     int16       oldest_buffer_index;
     576             : 
     577             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     578             : 
     579             :     /*
     580             :      * A fast path for all-cached scans (behavior A).  This is the same as the
     581             :      * usual algorithm, but it is specialized for no I/O and no per-buffer
     582             :      * data, so we can skip the queue management code, stay in the same buffer
     583             :      * slot and use singular StartReadBuffer().
     584             :      */
     585     7069410 :     if (likely(stream->fast_path))
     586             :     {
     587             :         BlockNumber next_blocknum;
     588             : 
     589             :         /* Fast path assumptions. */
     590             :         Assert(stream->ios_in_progress == 0);
     591             :         Assert(stream->pinned_buffers == 1);
     592             :         Assert(stream->distance == 1);
     593             :         Assert(stream->pending_read_nblocks == 0);
     594             :         Assert(stream->per_buffer_data_size == 0);
     595             : 
     596             :         /* We're going to return the buffer we pinned last time. */
     597     2693016 :         oldest_buffer_index = stream->oldest_buffer_index;
     598             :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     599             :                stream->next_buffer_index);
     600     2693016 :         buffer = stream->buffers[oldest_buffer_index];
     601             :         Assert(buffer != InvalidBuffer);
     602             : 
     603             :         /* Choose the next block to pin. */
     604     2693016 :         if (unlikely(stream->blocknums_next == stream->blocknums_count))
     605      330462 :             read_stream_fill_blocknums(stream);
     606     2693016 :         next_blocknum = stream->blocknums[stream->blocknums_next++];
     607             : 
     608     2693016 :         if (likely(next_blocknum != InvalidBlockNumber))
     609             :         {
     610             :             /*
     611             :              * Pin a buffer for the next call.  Same buffer entry, and
     612             :              * arbitrary I/O entry (they're all free).  We don't have to
     613             :              * adjust pinned_buffers because we're transferring one to caller
     614             :              * but pinning one more.
     615             :              */
     616     2563872 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
     617             :                                         &stream->buffers[oldest_buffer_index],
     618             :                                         next_blocknum,
     619             :                                         stream->advice_enabled ?
     620             :                                         READ_BUFFERS_ISSUE_ADVICE : 0)))
     621             :             {
     622             :                 /* Fast return. */
     623     2548878 :                 return buffer;
     624             :             }
     625             : 
     626             :             /* Next call must wait for I/O for the newly pinned buffer. */
     627       14994 :             stream->oldest_io_index = 0;
     628       14994 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     629       14994 :             stream->ios_in_progress = 1;
     630       14994 :             stream->ios[0].buffer_index = oldest_buffer_index;
     631       14994 :             stream->seq_blocknum = next_blocknum + 1;
     632             :         }
     633             :         else
     634             :         {
     635             :             /* No more blocks, end of stream. */
     636      129144 :             stream->distance = 0;
     637      129144 :             stream->oldest_buffer_index = stream->next_buffer_index;
     638      129144 :             stream->pinned_buffers = 0;
     639             :         }
     640             : 
     641      144138 :         stream->fast_path = false;
     642      144138 :         return buffer;
     643             :     }
     644             : #endif
     645             : 
     646     4376394 :     if (unlikely(stream->pinned_buffers == 0))
     647             :     {
     648             :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     649             : 
     650             :         /* End of stream reached?  */
     651     3680126 :         if (stream->distance == 0)
     652     2117054 :             return InvalidBuffer;
     653             : 
     654             :         /*
     655             :          * The usual order of operations is that we look ahead at the bottom
     656             :          * of this function after potentially finishing an I/O and making
     657             :          * space for more, but if we're just starting up we'll need to crank
     658             :          * the handle to get started.
     659             :          */
     660     1563072 :         read_stream_look_ahead(stream, true);
     661             : 
     662             :         /* End of stream reached? */
     663     1563072 :         if (stream->pinned_buffers == 0)
     664             :         {
     665             :             Assert(stream->distance == 0);
     666      765606 :             return InvalidBuffer;
     667             :         }
     668             :     }
     669             : 
     670             :     /* Grab the oldest pinned buffer and associated per-buffer data. */
     671             :     Assert(stream->pinned_buffers > 0);
     672     1493734 :     oldest_buffer_index = stream->oldest_buffer_index;
     673             :     Assert(oldest_buffer_index >= 0 &&
     674             :            oldest_buffer_index < stream->queue_size);
     675     1493734 :     buffer = stream->buffers[oldest_buffer_index];
     676     1493734 :     if (per_buffer_data)
     677           0 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     678             : 
     679             :     Assert(BufferIsValid(buffer));
     680             : 
     681             :     /* Do we have to wait for an associated I/O first? */
     682     1493734 :     if (stream->ios_in_progress > 0 &&
     683      536966 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     684             :     {
     685      526534 :         int16       io_index = stream->oldest_io_index;
     686             :         int16       distance;
     687             : 
     688             :         /* Sanity check that we still agree on the buffers. */
     689             :         Assert(stream->ios[io_index].op.buffers ==
     690             :                &stream->buffers[oldest_buffer_index]);
     691             : 
     692      526534 :         WaitReadBuffers(&stream->ios[io_index].op);
     693             : 
     694             :         Assert(stream->ios_in_progress > 0);
     695      526534 :         stream->ios_in_progress--;
     696      526534 :         if (++stream->oldest_io_index == stream->max_ios)
     697      506802 :             stream->oldest_io_index = 0;
     698             : 
     699      526534 :         if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
     700             :         {
     701             :             /* Distance ramps up fast (behavior C). */
     702         126 :             distance = stream->distance * 2;
     703         126 :             distance = Min(distance, stream->max_pinned_buffers);
     704         126 :             stream->distance = distance;
     705             :         }
     706             :         else
     707             :         {
     708             :             /* No advice; move towards io_combine_limit (behavior B). */
     709      526408 :             if (stream->distance > io_combine_limit)
     710             :             {
     711           0 :                 stream->distance--;
     712             :             }
     713             :             else
     714             :             {
     715      526408 :                 distance = stream->distance * 2;
     716      526408 :                 distance = Min(distance, io_combine_limit);
     717      526408 :                 distance = Min(distance, stream->max_pinned_buffers);
     718      526408 :                 stream->distance = distance;
     719             :             }
     720             :         }
     721             :     }
     722             : 
     723             : #ifdef CLOBBER_FREED_MEMORY
     724             :     /* Clobber old buffer and per-buffer data for debugging purposes. */
     725             :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
     726             : 
     727             :     /*
     728             :      * The caller will get access to the per-buffer data, until the next call.
     729             :      * We wipe the one before, which is never occupied because queue_size
     730             :      * allowed one extra element.  This will hopefully trip up client code
     731             :      * that is holding a dangling pointer to it.
     732             :      */
     733             :     if (stream->per_buffer_data)
     734             :         wipe_mem(get_per_buffer_data(stream,
     735             :                                      oldest_buffer_index == 0 ?
     736             :                                      stream->queue_size - 1 :
     737             :                                      oldest_buffer_index - 1),
     738             :                  stream->per_buffer_data_size);
     739             : #endif
     740             : 
     741             :     /* Pin transferred to caller. */
     742             :     Assert(stream->pinned_buffers > 0);
     743     1493734 :     stream->pinned_buffers--;
     744             : 
     745             :     /* Advance oldest buffer, with wrap-around. */
     746     1493734 :     stream->oldest_buffer_index++;
     747     1493734 :     if (stream->oldest_buffer_index == stream->queue_size)
     748      288388 :         stream->oldest_buffer_index = 0;
     749             : 
     750             :     /* Prepare for the next call. */
     751     1493734 :     read_stream_look_ahead(stream, false);
     752             : 
     753             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     754             :     /* See if we can take the fast path for all-cached scans next time. */
     755     1493734 :     if (stream->ios_in_progress == 0 &&
     756     1012318 :         stream->pinned_buffers == 1 &&
     757      290452 :         stream->distance == 1 &&
     758      276182 :         stream->pending_read_nblocks == 0 &&
     759      275852 :         stream->per_buffer_data_size == 0)
     760             :     {
     761      275852 :         stream->fast_path = true;
     762             :     }
     763             : #endif
     764             : 
     765     1493734 :     return buffer;
     766             : }
     767             : 
     768             : /*
     769             :  * Reset a read stream by releasing any queued up buffers, allowing the stream
     770             :  * to be used again for different blocks.  This can be used to clear an
     771             :  * end-of-stream condition and start again, or to throw away blocks that were
     772             :  * speculatively read and read some different blocks instead.
     773             :  */
     774             : void
     775     1563614 : read_stream_reset(ReadStream *stream)
     776             : {
     777             :     Buffer      buffer;
     778             : 
     779             :     /* Stop looking ahead. */
     780     1563614 :     stream->distance = 0;
     781             : 
     782             :     /* Forget buffered block numbers and fast path state. */
     783     1563614 :     stream->blocknums_next = 0;
     784     1563614 :     stream->blocknums_count = 0;
     785     1563614 :     stream->fast_path = false;
     786             : 
     787             :     /* Unpin anything that wasn't consumed. */
     788     1723418 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
     789      159804 :         ReleaseBuffer(buffer);
     790             : 
     791             :     Assert(stream->pinned_buffers == 0);
     792             :     Assert(stream->ios_in_progress == 0);
     793             : 
     794             :     /* Start off assuming data is cached. */
     795     1563614 :     stream->distance = 1;
     796     1563614 : }
     797             : 
     798             : /*
     799             :  * Release and free a read stream.
     800             :  */
     801             : void
     802      581952 : read_stream_end(ReadStream *stream)
     803             : {
     804      581952 :     read_stream_reset(stream);
     805      581952 :     pfree(stream);
     806      581952 : }

Generated by: LCOV version 1.14