LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 204 212 96.2 %
Date: 2025-02-22 07:14:56 Functions: 12 13 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * read_stream.c
       4             :  *    Mechanism for accessing buffered relation data with look-ahead
       5             :  *
       6             :  * Code that needs to access relation data typically pins blocks one at a
       7             :  * time, often in a predictable order that might be sequential or data-driven.
       8             :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9             :  * because blocks that are not yet in the buffer pool require I/O operations
      10             :  * that are small and might stall waiting for storage.  This mechanism looks
      11             :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12             :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13             :  * distance.
      14             :  *
      15             :  * A user-provided callback generates a stream of block numbers that is used
      16             :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17             :  * pending read.  When that isn't possible, the existing pending read is sent
      18             :  * to StartReadBuffers() so that a new one can begin to form.
      19             :  *
      20             :  * The algorithm for controlling the look-ahead distance tries to classify the
      21             :  * stream into three ideal behaviors:
      22             :  *
      23             :  * A) No I/O is necessary, because the requested blocks are fully cached
      24             :  * already.  There is no benefit to looking ahead more than one block, so
      25             :  * distance is 1.  This is the default initial assumption.
      26             :  *
      27             :  * B) I/O is necessary, but read-ahead advice is undesirable because the
      28             :  * access is sequential and we can rely on the kernel's read-ahead heuristics,
      29             :  * or impossible because direct I/O is enabled, or the system doesn't support
      30             :  * read-ahead advice.  There is no benefit in looking ahead more than
      31             :  * io_combine_limit, because in this case the only goal is larger read system
      32             :  * calls.  Looking further ahead would pin many buffers and perform
      33             :  * speculative work for no benefit.
      34             :  *
      35             :  * C) I/O is necessary, it appears to be random, and this system supports
      36             :  * read-ahead advice.  We'll look further ahead in order to reach the
      37             :  * configured level of I/O concurrency.
      38             :  *
      39             :  * The distance increases rapidly and decays slowly, so that it moves towards
      40             :  * those levels as different I/O patterns are discovered.  For example, a
      41             :  * sequential scan of fully cached data doesn't bother looking ahead, but a
      42             :  * sequential scan that hits a region of uncached blocks will start issuing
      43             :  * increasingly wide read calls until it plateaus at io_combine_limit.
      44             :  *
      45             :  * The main data structure is a circular queue of buffers of size
      46             :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      47             :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      48             :  * variable sized object that is passed from the callback to the consumer of
      49             :  * buffers.
      50             :  *
      51             :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      52             :  * I/Os that have been started with StartReadBuffers(), and for which
      53             :  * WaitReadBuffers() must be called before returning the buffer.
      54             :  *
      55             :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      56             :  * successive calls, then these data structures might appear as follows:
      57             :  *
      58             :  *                          buffers buf/data       ios
      59             :  *
      60             :  *                          +----+  +-----+       +--------+
      61             :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      62             :  *                          +----+  +-----+  |    +--------+
      63             :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      64             :  *                          +----+  +-----+  | |  +--------+
      65             :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      66             :  *                          +----+  +-----+    |  +--------+
      67             :  *                          | 43 |  |  ?  |    |  |        |
      68             :  *                          +----+  +-----+    |  +--------+
      69             :  *                          | 44 |  |  ?  |    |  |        |
      70             :  *                          +----+  +-----+    |  +--------+
      71             :  *                          | 60 |  |  ?  |<---+
      72             :  *                          +----+  +-----+
      73             :  *     next_buffer_index -> |    |  |     |
      74             :  *                          +----+  +-----+
      75             :  *
      76             :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      77             :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      78             :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      79             :  * does block 60.
      80             :  *
      81             :  *
      82             :  * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
      83             :  * Portions Copyright (c) 1994, Regents of the University of California
      84             :  *
      85             :  * IDENTIFICATION
      86             :  *    src/backend/storage/aio/read_stream.c
      87             :  *
      88             :  *-------------------------------------------------------------------------
      89             :  */
      90             : #include "postgres.h"
      91             : 
      92             : #include "miscadmin.h"
      93             : #include "storage/fd.h"
      94             : #include "storage/smgr.h"
      95             : #include "storage/read_stream.h"
      96             : #include "utils/memdebug.h"
      97             : #include "utils/rel.h"
      98             : #include "utils/spccache.h"
      99             : 
     100             : typedef struct InProgressIO
     101             : {
     102             :     int16       buffer_index;
     103             :     ReadBuffersOperation op;
     104             : } InProgressIO;
     105             : 
     106             : /*
     107             :  * State for managing a stream of reads.
     108             :  */
     109             : struct ReadStream
     110             : {
     111             :     int16       max_ios;
     112             :     int16       ios_in_progress;
     113             :     int16       queue_size;
     114             :     int16       max_pinned_buffers;
     115             :     int16       pinned_buffers;
     116             :     int16       distance;
     117             :     bool        advice_enabled;
     118             : 
     119             :     /*
     120             :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     121             :      * control problems when I/Os are split.
     122             :      */
     123             :     BlockNumber buffered_blocknum;
     124             : 
     125             :     /*
     126             :      * The callback that will tell us which block numbers to read, and an
     127             :      * opaque pointer that will be pass to it for its own purposes.
     128             :      */
     129             :     ReadStreamBlockNumberCB callback;
     130             :     void       *callback_private_data;
     131             : 
     132             :     /* Next expected block, for detecting sequential access. */
     133             :     BlockNumber seq_blocknum;
     134             : 
     135             :     /* The read operation we are currently preparing. */
     136             :     BlockNumber pending_read_blocknum;
     137             :     int16       pending_read_nblocks;
     138             : 
     139             :     /* Space for buffers and optional per-buffer private data. */
     140             :     size_t      per_buffer_data_size;
     141             :     void       *per_buffer_data;
     142             : 
     143             :     /* Read operations that have been started but not waited for yet. */
     144             :     InProgressIO *ios;
     145             :     int16       oldest_io_index;
     146             :     int16       next_io_index;
     147             : 
     148             :     bool        fast_path;
     149             : 
     150             :     /* Circular queue of buffers. */
     151             :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     152             :     int16       next_buffer_index;  /* Index of next buffer to pin */
     153             :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     154             : };
     155             : 
     156             : /*
     157             :  * Return a pointer to the per-buffer data by index.
     158             :  */
     159             : static inline void *
     160     5727674 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     161             : {
     162    11455348 :     return (char *) stream->per_buffer_data +
     163     5727674 :         stream->per_buffer_data_size * buffer_index;
     164             : }
     165             : 
     166             : /*
     167             :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     168             :  * blocks [current_blocknum, last_exclusive).
     169             :  */
     170             : BlockNumber
     171      541722 : block_range_read_stream_cb(ReadStream *stream,
     172             :                            void *callback_private_data,
     173             :                            void *per_buffer_data)
     174             : {
     175      541722 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     176             : 
     177      541722 :     if (p->current_blocknum < p->last_exclusive)
     178      429276 :         return p->current_blocknum++;
     179             : 
     180      112446 :     return InvalidBlockNumber;
     181             : }
     182             : 
     183             : /*
     184             :  * Ask the callback which block it would like us to read next, with a one block
     185             :  * buffer in front to allow read_stream_unget_block() to work.
     186             :  */
     187             : static inline BlockNumber
     188     8581236 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     189             : {
     190             :     BlockNumber blocknum;
     191             : 
     192     8581236 :     blocknum = stream->buffered_blocknum;
     193     8581236 :     if (blocknum != InvalidBlockNumber)
     194           4 :         stream->buffered_blocknum = InvalidBlockNumber;
     195             :     else
     196             :     {
     197             :         /*
     198             :          * Tell Valgrind that the per-buffer data is undefined.  That replaces
     199             :          * the "noaccess" state that was set when the consumer moved past this
     200             :          * entry last time around the queue, and should also catch callbacks
     201             :          * that fail to initialize data that the buffer consumer later
     202             :          * accesses.  On the first go around, it is undefined already.
     203             :          */
     204             :         VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
     205             :                                     stream->per_buffer_data_size);
     206     8581232 :         blocknum = stream->callback(stream,
     207             :                                     stream->callback_private_data,
     208             :                                     per_buffer_data);
     209             :     }
     210             : 
     211     8581236 :     return blocknum;
     212             : }
     213             : 
     214             : /*
     215             :  * In order to deal with short reads in StartReadBuffers(), we sometimes need
     216             :  * to defer handling of a block until later.
     217             :  */
     218             : static inline void
     219           4 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     220             : {
     221             :     /* We shouldn't ever unget more than one block. */
     222             :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     223             :     Assert(blocknum != InvalidBlockNumber);
     224           4 :     stream->buffered_blocknum = blocknum;
     225           4 : }
     226             : 
     227             : static void
     228     3045704 : read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
     229             : {
     230             :     bool        need_wait;
     231             :     int         nblocks;
     232             :     int         flags;
     233             :     int16       io_index;
     234             :     int16       overflow;
     235             :     int16       buffer_index;
     236             : 
     237             :     /* This should only be called with a pending read. */
     238             :     Assert(stream->pending_read_nblocks > 0);
     239             :     Assert(stream->pending_read_nblocks <= io_combine_limit);
     240             : 
     241             :     /* We had better not exceed the pin limit by starting this read. */
     242             :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     243             :            stream->max_pinned_buffers);
     244             : 
     245             :     /* We had better not be overwriting an existing pinned buffer. */
     246     3045704 :     if (stream->pinned_buffers > 0)
     247             :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     248             :     else
     249             :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     250             : 
     251             :     /*
     252             :      * If advice hasn't been suppressed, this system supports it, and this
     253             :      * isn't a strictly sequential pattern, then we'll issue advice.
     254             :      */
     255     3045704 :     if (!suppress_advice &&
     256     1944358 :         stream->advice_enabled &&
     257      872834 :         stream->pending_read_blocknum != stream->seq_blocknum)
     258      777206 :         flags = READ_BUFFERS_ISSUE_ADVICE;
     259             :     else
     260     2268498 :         flags = 0;
     261             : 
     262             :     /* We say how many blocks we want to read, but may be smaller on return. */
     263     3045704 :     buffer_index = stream->next_buffer_index;
     264     3045704 :     io_index = stream->next_io_index;
     265     3045704 :     nblocks = stream->pending_read_nblocks;
     266     3045704 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     267     3045704 :                                  &stream->buffers[buffer_index],
     268             :                                  stream->pending_read_blocknum,
     269             :                                  &nblocks,
     270             :                                  flags);
     271     3045704 :     stream->pinned_buffers += nblocks;
     272             : 
     273             :     /* Remember whether we need to wait before returning this buffer. */
     274     3045704 :     if (!need_wait)
     275             :     {
     276             :         /* Look-ahead distance decays, no I/O necessary (behavior A). */
     277     2095370 :         if (stream->distance > 1)
     278       17774 :             stream->distance--;
     279             :     }
     280             :     else
     281             :     {
     282             :         /*
     283             :          * Remember to call WaitReadBuffers() before returning head buffer.
     284             :          * Look-ahead distance will be adjusted after waiting.
     285             :          */
     286      950334 :         stream->ios[io_index].buffer_index = buffer_index;
     287      950334 :         if (++stream->next_io_index == stream->max_ios)
     288      917776 :             stream->next_io_index = 0;
     289             :         Assert(stream->ios_in_progress < stream->max_ios);
     290      950334 :         stream->ios_in_progress++;
     291      950334 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     292             :     }
     293             : 
     294             :     /*
     295             :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     296             :      * we want it to wrap around at queue_size.  Slide overflowing buffers to
     297             :      * the front of the array.
     298             :      */
     299     3045704 :     overflow = (buffer_index + nblocks) - stream->queue_size;
     300     3045704 :     if (overflow > 0)
     301        8850 :         memmove(&stream->buffers[0],
     302        8850 :                 &stream->buffers[stream->queue_size],
     303             :                 sizeof(stream->buffers[0]) * overflow);
     304             : 
     305             :     /* Compute location of start of next read, without using % operator. */
     306     3045704 :     buffer_index += nblocks;
     307     3045704 :     if (buffer_index >= stream->queue_size)
     308      515284 :         buffer_index -= stream->queue_size;
     309             :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     310     3045704 :     stream->next_buffer_index = buffer_index;
     311             : 
     312             :     /* Adjust the pending read to cover the remaining portion, if any. */
     313     3045704 :     stream->pending_read_blocknum += nblocks;
     314     3045704 :     stream->pending_read_nblocks -= nblocks;
     315     3045704 : }
     316             : 
     317             : static void
     318     5122280 : read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
     319             : {
     320     8433178 :     while (stream->ios_in_progress < stream->max_ios &&
     321     8406176 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     322             :     {
     323             :         BlockNumber blocknum;
     324             :         int16       buffer_index;
     325             :         void       *per_buffer_data;
     326             : 
     327     4946770 :         if (stream->pending_read_nblocks == io_combine_limit)
     328             :         {
     329           0 :             read_stream_start_pending_read(stream, suppress_advice);
     330           0 :             suppress_advice = false;
     331           0 :             continue;
     332             :         }
     333             : 
     334             :         /*
     335             :          * See which block the callback wants next in the stream.  We need to
     336             :          * compute the index of the Nth block of the pending read including
     337             :          * wrap-around, but we don't want to use the expensive % operator.
     338             :          */
     339     4946770 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     340     4946770 :         if (buffer_index >= stream->queue_size)
     341      102852 :             buffer_index -= stream->queue_size;
     342             :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     343     4946770 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     344     4946770 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     345     4946770 :         if (blocknum == InvalidBlockNumber)
     346             :         {
     347             :             /* End of stream. */
     348     1635868 :             stream->distance = 0;
     349     1635868 :             break;
     350             :         }
     351             : 
     352             :         /* Can we merge it with the pending read? */
     353     3310902 :         if (stream->pending_read_nblocks > 0 &&
     354      332112 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     355             :         {
     356      332106 :             stream->pending_read_nblocks++;
     357      332106 :             continue;
     358             :         }
     359             : 
     360             :         /* We have to start the pending read before we can build another. */
     361     2978798 :         while (stream->pending_read_nblocks > 0)
     362             :         {
     363           6 :             read_stream_start_pending_read(stream, suppress_advice);
     364           6 :             suppress_advice = false;
     365           6 :             if (stream->ios_in_progress == stream->max_ios)
     366             :             {
     367             :                 /* And we've hit the limit.  Rewind, and stop here. */
     368           4 :                 read_stream_unget_block(stream, blocknum);
     369           4 :                 return;
     370             :             }
     371             :         }
     372             : 
     373             :         /* This is the start of a new pending read. */
     374     2978792 :         stream->pending_read_blocknum = blocknum;
     375     2978792 :         stream->pending_read_nblocks = 1;
     376             :     }
     377             : 
     378             :     /*
     379             :      * We don't start the pending read just because we've hit the distance
     380             :      * limit, preferring to give it another chance to grow to full
     381             :      * io_combine_limit size once more buffers have been consumed.  However,
     382             :      * if we've already reached io_combine_limit, or we've reached the
     383             :      * distance limit and there isn't anything pinned yet, or the callback has
     384             :      * signaled end-of-stream, we start the read immediately.
     385             :      */
     386     5122276 :     if (stream->pending_read_nblocks > 0 &&
     387     3169592 :         (stream->pending_read_nblocks == io_combine_limit ||
     388     3159098 :          (stream->pending_read_nblocks == stream->distance &&
     389     2892030 :           stream->pinned_buffers == 0) ||
     390      267068 :          stream->distance == 0) &&
     391     3045896 :         stream->ios_in_progress < stream->max_ios)
     392     3045698 :         read_stream_start_pending_read(stream, suppress_advice);
     393             : }
     394             : 
     395             : /*
     396             :  * Create a new read stream object that can be used to perform the equivalent
     397             :  * of a series of ReadBuffer() calls for one fork of one relation.
     398             :  * Internally, it generates larger vectored reads where possible by looking
     399             :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     400             :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     401             :  * write extra data for each block into the space provided to it.  It will
     402             :  * also receive callback_private_data for its own purposes.
     403             :  */
     404             : static ReadStream *
     405      915940 : read_stream_begin_impl(int flags,
     406             :                        BufferAccessStrategy strategy,
     407             :                        Relation rel,
     408             :                        SMgrRelation smgr,
     409             :                        char persistence,
     410             :                        ForkNumber forknum,
     411             :                        ReadStreamBlockNumberCB callback,
     412             :                        void *callback_private_data,
     413             :                        size_t per_buffer_data_size)
     414             : {
     415             :     ReadStream *stream;
     416             :     size_t      size;
     417             :     int16       queue_size;
     418             :     int         max_ios;
     419             :     int         strategy_pin_limit;
     420             :     uint32      max_pinned_buffers;
     421             :     Oid         tablespace_id;
     422             : 
     423             :     /*
     424             :      * Decide how many I/Os we will allow to run at the same time.  That
     425             :      * currently means advice to the kernel to tell it that we will soon read.
     426             :      * This number also affects how far we look ahead for opportunities to
     427             :      * start more I/Os.
     428             :      */
     429      915940 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     430      915940 :     if (!OidIsValid(MyDatabaseId) ||
     431     1055706 :         (rel && IsCatalogRelation(rel)) ||
     432      263872 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     433             :     {
     434             :         /*
     435             :          * Avoid circularity while trying to look up tablespace settings or
     436             :          * before spccache.c is ready.
     437             :          */
     438      753022 :         max_ios = effective_io_concurrency;
     439             :     }
     440      162918 :     else if (flags & READ_STREAM_MAINTENANCE)
     441       19500 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     442             :     else
     443      143418 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     444             : 
     445             :     /* Cap to INT16_MAX to avoid overflowing below */
     446      915940 :     max_ios = Min(max_ios, PG_INT16_MAX);
     447             : 
     448             :     /*
     449             :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     450             :      * pin fewer if we can, though.  We clamp it to at least io_combine_limit
     451             :      * so that we can have a chance to build up a full io_combine_limit sized
     452             :      * read, even when max_ios is zero.  Be careful not to allow int16 to
     453             :      * overflow (even though that's not possible with the current GUC range
     454             :      * limits), allowing also for the spare entry and the overflow space.
     455             :      */
     456      915940 :     max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
     457      915940 :     max_pinned_buffers = Min(max_pinned_buffers,
     458             :                              PG_INT16_MAX - io_combine_limit - 1);
     459             : 
     460             :     /* Give the strategy a chance to limit the number of buffers we pin. */
     461      915940 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     462      915940 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     463             : 
     464             :     /* Don't allow this backend to pin more than its share of buffers. */
     465      915940 :     if (SmgrIsTemp(smgr))
     466       11128 :         LimitAdditionalLocalPins(&max_pinned_buffers);
     467             :     else
     468      904812 :         LimitAdditionalPins(&max_pinned_buffers);
     469             :     Assert(max_pinned_buffers > 0);
     470             : 
     471             :     /*
     472             :      * We need one extra entry for buffers and per-buffer data, because users
     473             :      * of per-buffer data have access to the object until the next call to
     474             :      * read_stream_next_buffer(), so we need a gap between the head and tail
     475             :      * of the queue so that we don't clobber it.
     476             :      */
     477      915940 :     queue_size = max_pinned_buffers + 1;
     478             : 
     479             :     /*
     480             :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     481             :      * one big chunk.  Though we have queue_size buffers, we want to be able
     482             :      * to assume that all the buffers for a single read are contiguous (i.e.
     483             :      * don't wrap around halfway through), so we allow temporary overflows of
     484             :      * up to the maximum possible read size by allocating an extra
     485             :      * io_combine_limit - 1 elements.
     486             :      */
     487      915940 :     size = offsetof(ReadStream, buffers);
     488      915940 :     size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
     489      915940 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     490      915940 :     size += per_buffer_data_size * queue_size;
     491      915940 :     size += MAXIMUM_ALIGNOF * 2;
     492      915940 :     stream = (ReadStream *) palloc(size);
     493      915940 :     memset(stream, 0, offsetof(ReadStream, buffers));
     494      915940 :     stream->ios = (InProgressIO *)
     495      915940 :         MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
     496      915940 :     if (per_buffer_data_size > 0)
     497      118650 :         stream->per_buffer_data = (void *)
     498      118650 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     499             : 
     500             : #ifdef USE_PREFETCH
     501             : 
     502             :     /*
     503             :      * This system supports prefetching advice.  We can use it as long as
     504             :      * direct I/O isn't enabled, the caller hasn't promised sequential access
     505             :      * (overriding our detection heuristics), and max_ios hasn't been set to
     506             :      * zero.
     507             :      */
     508      915940 :     if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
     509      915740 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     510             :         max_ios > 0)
     511      247356 :         stream->advice_enabled = true;
     512             : #endif
     513             : 
     514             :     /*
     515             :      * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
     516             :      * above.  If we had real asynchronous I/O we might need a slightly
     517             :      * different definition.
     518             :      */
     519      915940 :     if (max_ios == 0)
     520           0 :         max_ios = 1;
     521             : 
     522      915940 :     stream->max_ios = max_ios;
     523      915940 :     stream->per_buffer_data_size = per_buffer_data_size;
     524      915940 :     stream->max_pinned_buffers = max_pinned_buffers;
     525      915940 :     stream->queue_size = queue_size;
     526      915940 :     stream->callback = callback;
     527      915940 :     stream->callback_private_data = callback_private_data;
     528      915940 :     stream->buffered_blocknum = InvalidBlockNumber;
     529             : 
     530             :     /*
     531             :      * Skip the initial ramp-up phase if the caller says we're going to be
     532             :      * reading the whole relation.  This way we start out assuming we'll be
     533             :      * doing full io_combine_limit sized reads (behavior B).
     534             :      */
     535      915940 :     if (flags & READ_STREAM_FULL)
     536      113808 :         stream->distance = Min(max_pinned_buffers, io_combine_limit);
     537             :     else
     538      802132 :         stream->distance = 1;
     539             : 
     540             :     /*
     541             :      * Since we always access the same relation, we can initialize parts of
     542             :      * the ReadBuffersOperation objects and leave them that way, to avoid
     543             :      * wasting CPU cycles writing to them for each read.
     544             :      */
     545     2056184 :     for (int i = 0; i < max_ios; ++i)
     546             :     {
     547     1140244 :         stream->ios[i].op.rel = rel;
     548     1140244 :         stream->ios[i].op.smgr = smgr;
     549     1140244 :         stream->ios[i].op.persistence = persistence;
     550     1140244 :         stream->ios[i].op.forknum = forknum;
     551     1140244 :         stream->ios[i].op.strategy = strategy;
     552             :     }
     553             : 
     554      915940 :     return stream;
     555             : }
     556             : 
     557             : /*
     558             :  * Create a new read stream for reading a relation.
     559             :  * See read_stream_begin_impl() for the detailed explanation.
     560             :  */
     561             : ReadStream *
     562      807662 : read_stream_begin_relation(int flags,
     563             :                            BufferAccessStrategy strategy,
     564             :                            Relation rel,
     565             :                            ForkNumber forknum,
     566             :                            ReadStreamBlockNumberCB callback,
     567             :                            void *callback_private_data,
     568             :                            size_t per_buffer_data_size)
     569             : {
     570      807662 :     return read_stream_begin_impl(flags,
     571             :                                   strategy,
     572             :                                   rel,
     573             :                                   RelationGetSmgr(rel),
     574      807662 :                                   rel->rd_rel->relpersistence,
     575             :                                   forknum,
     576             :                                   callback,
     577             :                                   callback_private_data,
     578             :                                   per_buffer_data_size);
     579             : }
     580             : 
     581             : /*
     582             :  * Create a new read stream for reading a SMgr relation.
     583             :  * See read_stream_begin_impl() for the detailed explanation.
     584             :  */
     585             : ReadStream *
     586      108278 : read_stream_begin_smgr_relation(int flags,
     587             :                                 BufferAccessStrategy strategy,
     588             :                                 SMgrRelation smgr,
     589             :                                 char smgr_persistence,
     590             :                                 ForkNumber forknum,
     591             :                                 ReadStreamBlockNumberCB callback,
     592             :                                 void *callback_private_data,
     593             :                                 size_t per_buffer_data_size)
     594             : {
     595      108278 :     return read_stream_begin_impl(flags,
     596             :                                   strategy,
     597             :                                   NULL,
     598             :                                   smgr,
     599             :                                   smgr_persistence,
     600             :                                   forknum,
     601             :                                   callback,
     602             :                                   callback_private_data,
     603             :                                   per_buffer_data_size);
     604             : }
     605             : 
     606             : /*
     607             :  * Pull one pinned buffer out of a stream.  Each call returns successive
     608             :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     609             :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     610             :  * per-buffer data that the callback had a chance to populate, which remains
     611             :  * valid until the next call to read_stream_next_buffer().  When the stream
     612             :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     613             :  * the stream early at any time by calling read_stream_end().
     614             :  */
     615             : Buffer
     616    10309992 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     617             : {
     618             :     Buffer      buffer;
     619             :     int16       oldest_buffer_index;
     620             : 
     621             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     622             : 
     623             :     /*
     624             :      * A fast path for all-cached scans (behavior A).  This is the same as the
     625             :      * usual algorithm, but it is specialized for no I/O and no per-buffer
     626             :      * data, so we can skip the queue management code, stay in the same buffer
     627             :      * slot and use singular StartReadBuffer().
     628             :      */
     629    10309992 :     if (likely(stream->fast_path))
     630             :     {
     631             :         BlockNumber next_blocknum;
     632             : 
     633             :         /* Fast path assumptions. */
     634             :         Assert(stream->ios_in_progress == 0);
     635             :         Assert(stream->pinned_buffers == 1);
     636             :         Assert(stream->distance == 1);
     637             :         Assert(stream->pending_read_nblocks == 0);
     638             :         Assert(stream->per_buffer_data_size == 0);
     639             : 
     640             :         /* We're going to return the buffer we pinned last time. */
     641     3634466 :         oldest_buffer_index = stream->oldest_buffer_index;
     642             :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     643             :                stream->next_buffer_index);
     644     3634466 :         buffer = stream->buffers[oldest_buffer_index];
     645             :         Assert(buffer != InvalidBuffer);
     646             : 
     647             :         /* Choose the next block to pin. */
     648     3634466 :         next_blocknum = read_stream_get_block(stream, NULL);
     649             : 
     650     3634466 :         if (likely(next_blocknum != InvalidBlockNumber))
     651             :         {
     652             :             /*
     653             :              * Pin a buffer for the next call.  Same buffer entry, and
     654             :              * arbitrary I/O entry (they're all free).  We don't have to
     655             :              * adjust pinned_buffers because we're transferring one to caller
     656             :              * but pinning one more.
     657             :              */
     658     3477854 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
     659             :                                         &stream->buffers[oldest_buffer_index],
     660             :                                         next_blocknum,
     661             :                                         stream->advice_enabled ?
     662             :                                         READ_BUFFERS_ISSUE_ADVICE : 0)))
     663             :             {
     664             :                 /* Fast return. */
     665     3455728 :                 return buffer;
     666             :             }
     667             : 
     668             :             /* Next call must wait for I/O for the newly pinned buffer. */
     669       22126 :             stream->oldest_io_index = 0;
     670       22126 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     671       22126 :             stream->ios_in_progress = 1;
     672       22126 :             stream->ios[0].buffer_index = oldest_buffer_index;
     673       22126 :             stream->seq_blocknum = next_blocknum + 1;
     674             :         }
     675             :         else
     676             :         {
     677             :             /* No more blocks, end of stream. */
     678      156612 :             stream->distance = 0;
     679      156612 :             stream->oldest_buffer_index = stream->next_buffer_index;
     680      156612 :             stream->pinned_buffers = 0;
     681             :         }
     682             : 
     683      178738 :         stream->fast_path = false;
     684      178738 :         return buffer;
     685             :     }
     686             : #endif
     687             : 
     688     6675526 :     if (unlikely(stream->pinned_buffers == 0))
     689             :     {
     690             :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     691             : 
     692             :         /* End of stream reached?  */
     693     4622608 :         if (stream->distance == 0)
     694     2654592 :             return InvalidBuffer;
     695             : 
     696             :         /*
     697             :          * The usual order of operations is that we look ahead at the bottom
     698             :          * of this function after potentially finishing an I/O and making
     699             :          * space for more, but if we're just starting up we'll need to crank
     700             :          * the handle to get started.
     701             :          */
     702     1968016 :         read_stream_look_ahead(stream, true);
     703             : 
     704             :         /* End of stream reached? */
     705     1968016 :         if (stream->pinned_buffers == 0)
     706             :         {
     707             :             Assert(stream->distance == 0);
     708      866670 :             return InvalidBuffer;
     709             :         }
     710             :     }
     711             : 
     712             :     /* Grab the oldest pinned buffer and associated per-buffer data. */
     713             :     Assert(stream->pinned_buffers > 0);
     714     3154264 :     oldest_buffer_index = stream->oldest_buffer_index;
     715             :     Assert(oldest_buffer_index >= 0 &&
     716             :            oldest_buffer_index < stream->queue_size);
     717     3154264 :     buffer = stream->buffers[oldest_buffer_index];
     718     3154264 :     if (per_buffer_data)
     719      780904 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     720             : 
     721             :     Assert(BufferIsValid(buffer));
     722             : 
     723             :     /* Do we have to wait for an associated I/O first? */
     724     3154264 :     if (stream->ios_in_progress > 0 &&
     725      999630 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     726             :     {
     727      972458 :         int16       io_index = stream->oldest_io_index;
     728             :         int16       distance;
     729             : 
     730             :         /* Sanity check that we still agree on the buffers. */
     731             :         Assert(stream->ios[io_index].op.buffers ==
     732             :                &stream->buffers[oldest_buffer_index]);
     733             : 
     734      972458 :         WaitReadBuffers(&stream->ios[io_index].op);
     735             : 
     736             :         Assert(stream->ios_in_progress > 0);
     737      972458 :         stream->ios_in_progress--;
     738      972458 :         if (++stream->oldest_io_index == stream->max_ios)
     739      939880 :             stream->oldest_io_index = 0;
     740             : 
     741      972458 :         if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
     742             :         {
     743             :             /* Distance ramps up fast (behavior C). */
     744        1648 :             distance = stream->distance * 2;
     745        1648 :             distance = Min(distance, stream->max_pinned_buffers);
     746        1648 :             stream->distance = distance;
     747             :         }
     748             :         else
     749             :         {
     750             :             /* No advice; move towards io_combine_limit (behavior B). */
     751      970810 :             if (stream->distance > io_combine_limit)
     752             :             {
     753           0 :                 stream->distance--;
     754             :             }
     755             :             else
     756             :             {
     757      970810 :                 distance = stream->distance * 2;
     758      970810 :                 distance = Min(distance, io_combine_limit);
     759      970810 :                 distance = Min(distance, stream->max_pinned_buffers);
     760      970810 :                 stream->distance = distance;
     761             :             }
     762             :         }
     763             :     }
     764             : 
     765             : #ifdef CLOBBER_FREED_MEMORY
     766             :     /* Clobber old buffer for debugging purposes. */
     767             :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
     768             : #endif
     769             : 
     770             : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
     771             : 
     772             :     /*
     773             :      * The caller will get access to the per-buffer data, until the next call.
     774             :      * We wipe the one before, which is never occupied because queue_size
     775             :      * allowed one extra element.  This will hopefully trip up client code
     776             :      * that is holding a dangling pointer to it.
     777             :      */
     778             :     if (stream->per_buffer_data)
     779             :     {
     780             :         void       *per_buffer_data;
     781             : 
     782             :         per_buffer_data = get_per_buffer_data(stream,
     783             :                                               oldest_buffer_index == 0 ?
     784             :                                               stream->queue_size - 1 :
     785             :                                               oldest_buffer_index - 1);
     786             : 
     787             : #if defined(CLOBBER_FREED_MEMORY)
     788             :         /* This also tells Valgrind the memory is "noaccess". */
     789             :         wipe_mem(per_buffer_data, stream->per_buffer_data_size);
     790             : #elif defined(USE_VALGRIND)
     791             :         /* Tell it ourselves. */
     792             :         VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
     793             :                                    stream->per_buffer_data_size);
     794             : #endif
     795             :     }
     796             : #endif
     797             : 
     798             :     /* Pin transferred to caller. */
     799             :     Assert(stream->pinned_buffers > 0);
     800     3154264 :     stream->pinned_buffers--;
     801             : 
     802             :     /* Advance oldest buffer, with wrap-around. */
     803     3154264 :     stream->oldest_buffer_index++;
     804     3154264 :     if (stream->oldest_buffer_index == stream->queue_size)
     805      501828 :         stream->oldest_buffer_index = 0;
     806             : 
     807             :     /* Prepare for the next call. */
     808     3154264 :     read_stream_look_ahead(stream, false);
     809             : 
     810             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     811             :     /* See if we can take the fast path for all-cached scans next time. */
     812     3154264 :     if (stream->ios_in_progress == 0 &&
     813     2301934 :         stream->pinned_buffers == 1 &&
     814     1166652 :         stream->distance == 1 &&
     815     1056272 :         stream->pending_read_nblocks == 0 &&
     816     1054658 :         stream->per_buffer_data_size == 0)
     817             :     {
     818      343584 :         stream->fast_path = true;
     819             :     }
     820             : #endif
     821             : 
     822     3154264 :     return buffer;
     823             : }
     824             : 
     825             : /*
     826             :  * Transitional support for code that would like to perform or skip reads
     827             :  * itself, without using the stream.  Returns, and consumes, the next block
     828             :  * number that would be read by the stream's look-ahead algorithm, or
     829             :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
     830             :  * strategy that would be used to read it.
     831             :  */
     832             : BlockNumber
     833           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
     834             : {
     835           0 :     *strategy = stream->ios[0].op.strategy;
     836           0 :     return read_stream_get_block(stream, NULL);
     837             : }
     838             : 
     839             : /*
     840             :  * Reset a read stream by releasing any queued up buffers, allowing the stream
     841             :  * to be used again for different blocks.  This can be used to clear an
     842             :  * end-of-stream condition and start again, or to throw away blocks that were
     843             :  * speculatively read and read some different blocks instead.
     844             :  */
     845             : void
     846     1968320 : read_stream_reset(ReadStream *stream)
     847             : {
     848             :     Buffer      buffer;
     849             : 
     850             :     /* Stop looking ahead. */
     851     1968320 :     stream->distance = 0;
     852             : 
     853             :     /* Forget buffered block number and fast path state. */
     854     1968320 :     stream->buffered_blocknum = InvalidBlockNumber;
     855     1968320 :     stream->fast_path = false;
     856             : 
     857             :     /* Unpin anything that wasn't consumed. */
     858     2165064 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
     859      196744 :         ReleaseBuffer(buffer);
     860             : 
     861             :     Assert(stream->pinned_buffers == 0);
     862             :     Assert(stream->ios_in_progress == 0);
     863             : 
     864             :     /* Start off assuming data is cached. */
     865     1968320 :     stream->distance = 1;
     866     1968320 : }
     867             : 
     868             : /*
     869             :  * Release and free a read stream.
     870             :  */
     871             : void
     872      913376 : read_stream_end(ReadStream *stream)
     873             : {
     874      913376 :     read_stream_reset(stream);
     875      913376 :     pfree(stream);
     876      913376 : }

Generated by: LCOV version 1.14