LCOV - code coverage report
Current view: top level - src/backend/storage/aio - read_stream.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 195 212 92.0 %
Date: 2025-01-18 03:14:54 Functions: 11 13 84.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * read_stream.c
       4             :  *    Mechanism for accessing buffered relation data with look-ahead
       5             :  *
       6             :  * Code that needs to access relation data typically pins blocks one at a
       7             :  * time, often in a predictable order that might be sequential or data-driven.
       8             :  * Calling the simple ReadBuffer() function for each block is inefficient,
       9             :  * because blocks that are not yet in the buffer pool require I/O operations
      10             :  * that are small and might stall waiting for storage.  This mechanism looks
      11             :  * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
      12             :  * neighboring blocks together and ahead of time, with an adaptive look-ahead
      13             :  * distance.
      14             :  *
      15             :  * A user-provided callback generates a stream of block numbers that is used
      16             :  * to form reads of up to io_combine_limit, by attempting to merge them with a
      17             :  * pending read.  When that isn't possible, the existing pending read is sent
      18             :  * to StartReadBuffers() so that a new one can begin to form.
      19             :  *
      20             :  * The algorithm for controlling the look-ahead distance tries to classify the
      21             :  * stream into three ideal behaviors:
      22             :  *
      23             :  * A) No I/O is necessary, because the requested blocks are fully cached
      24             :  * already.  There is no benefit to looking ahead more than one block, so
      25             :  * distance is 1.  This is the default initial assumption.
      26             :  *
      27             :  * B) I/O is necessary, but read-ahead advice is undesirable because the
      28             :  * access is sequential and we can rely on the kernel's read-ahead heuristics,
      29             :  * or impossible because direct I/O is enabled, or the system doesn't support
      30             :  * read-ahead advice.  There is no benefit in looking ahead more than
      31             :  * io_combine_limit, because in this case the only goal is larger read system
      32             :  * calls.  Looking further ahead would pin many buffers and perform
      33             :  * speculative work for no benefit.
      34             :  *
      35             :  * C) I/O is necessary, it appears to be random, and this system supports
      36             :  * read-ahead advice.  We'll look further ahead in order to reach the
      37             :  * configured level of I/O concurrency.
      38             :  *
      39             :  * The distance increases rapidly and decays slowly, so that it moves towards
      40             :  * those levels as different I/O patterns are discovered.  For example, a
      41             :  * sequential scan of fully cached data doesn't bother looking ahead, but a
      42             :  * sequential scan that hits a region of uncached blocks will start issuing
      43             :  * increasingly wide read calls until it plateaus at io_combine_limit.
      44             :  *
      45             :  * The main data structure is a circular queue of buffers of size
      46             :  * max_pinned_buffers plus some extra space for technical reasons, ready to be
      47             :  * returned by read_stream_next_buffer().  Each buffer also has an optional
      48             :  * variable sized object that is passed from the callback to the consumer of
      49             :  * buffers.
      50             :  *
      51             :  * Parallel to the queue of buffers, there is a circular queue of in-progress
      52             :  * I/Os that have been started with StartReadBuffers(), and for which
      53             :  * WaitReadBuffers() must be called before returning the buffer.
      54             :  *
      55             :  * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
      56             :  * successive calls, then these data structures might appear as follows:
      57             :  *
      58             :  *                          buffers buf/data       ios
      59             :  *
      60             :  *                          +----+  +-----+       +--------+
      61             :  *                          |    |  |     |  +----+ 42..44 | <- oldest_io_index
      62             :  *                          +----+  +-----+  |    +--------+
      63             :  *   oldest_buffer_index -> | 10 |  |  ?  |  | +--+ 60..60 |
      64             :  *                          +----+  +-----+  | |  +--------+
      65             :  *                          | 42 |  |  ?  |<-+ |  |        | <- next_io_index
      66             :  *                          +----+  +-----+    |  +--------+
      67             :  *                          | 43 |  |  ?  |    |  |        |
      68             :  *                          +----+  +-----+    |  +--------+
      69             :  *                          | 44 |  |  ?  |    |  |        |
      70             :  *                          +----+  +-----+    |  +--------+
      71             :  *                          | 60 |  |  ?  |<---+
      72             :  *                          +----+  +-----+
      73             :  *     next_buffer_index -> |    |  |     |
      74             :  *                          +----+  +-----+
      75             :  *
      76             :  * In the example, 5 buffers are pinned, and the next buffer to be streamed to
      77             :  * the client is block 10.  Block 10 was a hit and has no associated I/O, but
      78             :  * the range 42..44 requires an I/O wait before its buffers are returned, as
      79             :  * does block 60.
      80             :  *
      81             :  *
      82             :  * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
      83             :  * Portions Copyright (c) 1994, Regents of the University of California
      84             :  *
      85             :  * IDENTIFICATION
      86             :  *    src/backend/storage/aio/read_stream.c
      87             :  *
      88             :  *-------------------------------------------------------------------------
      89             :  */
      90             : #include "postgres.h"
      91             : 
      92             : #include "miscadmin.h"
      93             : #include "storage/fd.h"
      94             : #include "storage/smgr.h"
      95             : #include "storage/read_stream.h"
      96             : #include "utils/memdebug.h"
      97             : #include "utils/rel.h"
      98             : #include "utils/spccache.h"
      99             : 
     100             : typedef struct InProgressIO
     101             : {
     102             :     int16       buffer_index;
     103             :     ReadBuffersOperation op;
     104             : } InProgressIO;
     105             : 
     106             : /*
     107             :  * State for managing a stream of reads.
     108             :  */
     109             : struct ReadStream
     110             : {
     111             :     int16       max_ios;
     112             :     int16       ios_in_progress;
     113             :     int16       queue_size;
     114             :     int16       max_pinned_buffers;
     115             :     int16       pinned_buffers;
     116             :     int16       distance;
     117             :     bool        advice_enabled;
     118             : 
     119             :     /*
     120             :      * One-block buffer to support 'ungetting' a block number, to resolve flow
     121             :      * control problems when I/Os are split.
     122             :      */
     123             :     BlockNumber buffered_blocknum;
     124             : 
     125             :     /*
     126             :      * The callback that will tell us which block numbers to read, and an
     127             :      * opaque pointer that will be pass to it for its own purposes.
     128             :      */
     129             :     ReadStreamBlockNumberCB callback;
     130             :     void       *callback_private_data;
     131             : 
     132             :     /* Next expected block, for detecting sequential access. */
     133             :     BlockNumber seq_blocknum;
     134             : 
     135             :     /* The read operation we are currently preparing. */
     136             :     BlockNumber pending_read_blocknum;
     137             :     int16       pending_read_nblocks;
     138             : 
     139             :     /* Space for buffers and optional per-buffer private data. */
     140             :     size_t      per_buffer_data_size;
     141             :     void       *per_buffer_data;
     142             : 
     143             :     /* Read operations that have been started but not waited for yet. */
     144             :     InProgressIO *ios;
     145             :     int16       oldest_io_index;
     146             :     int16       next_io_index;
     147             : 
     148             :     bool        fast_path;
     149             : 
     150             :     /* Circular queue of buffers. */
     151             :     int16       oldest_buffer_index;    /* Next pinned buffer to return */
     152             :     int16       next_buffer_index;  /* Index of next buffer to pin */
     153             :     Buffer      buffers[FLEXIBLE_ARRAY_MEMBER];
     154             : };
     155             : 
     156             : /*
     157             :  * Return a pointer to the per-buffer data by index.
     158             :  */
     159             : static inline void *
     160     3868106 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
     161             : {
     162     7736212 :     return (char *) stream->per_buffer_data +
     163     3868106 :         stream->per_buffer_data_size * buffer_index;
     164             : }
     165             : 
     166             : /*
     167             :  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
     168             :  * blocks [current_blocknum, last_exclusive).
     169             :  */
     170             : BlockNumber
     171      538476 : block_range_read_stream_cb(ReadStream *stream,
     172             :                            void *callback_private_data,
     173             :                            void *per_buffer_data)
     174             : {
     175      538476 :     BlockRangeReadStreamPrivate *p = callback_private_data;
     176             : 
     177      538476 :     if (p->current_blocknum < p->last_exclusive)
     178      426522 :         return p->current_blocknum++;
     179             : 
     180      111954 :     return InvalidBlockNumber;
     181             : }
     182             : 
     183             : /*
     184             :  * Ask the callback which block it would like us to read next, with a one block
     185             :  * buffer in front to allow read_stream_unget_block() to work.
     186             :  */
     187             : static inline BlockNumber
     188     7393644 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
     189             : {
     190             :     BlockNumber blocknum;
     191             : 
     192     7393644 :     blocknum = stream->buffered_blocknum;
     193     7393644 :     if (blocknum != InvalidBlockNumber)
     194           0 :         stream->buffered_blocknum = InvalidBlockNumber;
     195             :     else
     196     7393644 :         blocknum = stream->callback(stream,
     197             :                                     stream->callback_private_data,
     198             :                                     per_buffer_data);
     199             : 
     200     7393644 :     return blocknum;
     201             : }
     202             : 
     203             : /*
     204             :  * In order to deal with short reads in StartReadBuffers(), we sometimes need
     205             :  * to defer handling of a block until later.
     206             :  */
     207             : static inline void
     208           0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
     209             : {
     210             :     /* We shouldn't ever unget more than one block. */
     211             :     Assert(stream->buffered_blocknum == InvalidBlockNumber);
     212             :     Assert(blocknum != InvalidBlockNumber);
     213           0 :     stream->buffered_blocknum = blocknum;
     214           0 : }
     215             : 
     216             : static void
     217     2167118 : read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
     218             : {
     219             :     bool        need_wait;
     220             :     int         nblocks;
     221             :     int         flags;
     222             :     int16       io_index;
     223             :     int16       overflow;
     224             :     int16       buffer_index;
     225             : 
     226             :     /* This should only be called with a pending read. */
     227             :     Assert(stream->pending_read_nblocks > 0);
     228             :     Assert(stream->pending_read_nblocks <= io_combine_limit);
     229             : 
     230             :     /* We had better not exceed the pin limit by starting this read. */
     231             :     Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
     232             :            stream->max_pinned_buffers);
     233             : 
     234             :     /* We had better not be overwriting an existing pinned buffer. */
     235     2167118 :     if (stream->pinned_buffers > 0)
     236             :         Assert(stream->next_buffer_index != stream->oldest_buffer_index);
     237             :     else
     238             :         Assert(stream->next_buffer_index == stream->oldest_buffer_index);
     239             : 
     240             :     /*
     241             :      * If advice hasn't been suppressed, this system supports it, and this
     242             :      * isn't a strictly sequential pattern, then we'll issue advice.
     243             :      */
     244     2167118 :     if (!suppress_advice &&
     245     1170488 :         stream->advice_enabled &&
     246      142464 :         stream->pending_read_blocknum != stream->seq_blocknum)
     247       65240 :         flags = READ_BUFFERS_ISSUE_ADVICE;
     248             :     else
     249     2101878 :         flags = 0;
     250             : 
     251             :     /* We say how many blocks we want to read, but may be smaller on return. */
     252     2167118 :     buffer_index = stream->next_buffer_index;
     253     2167118 :     io_index = stream->next_io_index;
     254     2167118 :     nblocks = stream->pending_read_nblocks;
     255     2167118 :     need_wait = StartReadBuffers(&stream->ios[io_index].op,
     256     2167118 :                                  &stream->buffers[buffer_index],
     257             :                                  stream->pending_read_blocknum,
     258             :                                  &nblocks,
     259             :                                  flags);
     260     2167118 :     stream->pinned_buffers += nblocks;
     261             : 
     262             :     /* Remember whether we need to wait before returning this buffer. */
     263     2167118 :     if (!need_wait)
     264             :     {
     265             :         /* Look-ahead distance decays, no I/O necessary (behavior A). */
     266     1271208 :         if (stream->distance > 1)
     267       16538 :             stream->distance--;
     268             :     }
     269             :     else
     270             :     {
     271             :         /*
     272             :          * Remember to call WaitReadBuffers() before returning head buffer.
     273             :          * Look-ahead distance will be adjusted after waiting.
     274             :          */
     275      895910 :         stream->ios[io_index].buffer_index = buffer_index;
     276      895910 :         if (++stream->next_io_index == stream->max_ios)
     277      875956 :             stream->next_io_index = 0;
     278             :         Assert(stream->ios_in_progress < stream->max_ios);
     279      895910 :         stream->ios_in_progress++;
     280      895910 :         stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
     281             :     }
     282             : 
     283             :     /*
     284             :      * We gave a contiguous range of buffer space to StartReadBuffers(), but
     285             :      * we want it to wrap around at queue_size.  Slide overflowing buffers to
     286             :      * the front of the array.
     287             :      */
     288     2167118 :     overflow = (buffer_index + nblocks) - stream->queue_size;
     289     2167118 :     if (overflow > 0)
     290        7982 :         memmove(&stream->buffers[0],
     291        7982 :                 &stream->buffers[stream->queue_size],
     292             :                 sizeof(stream->buffers[0]) * overflow);
     293             : 
     294             :     /* Compute location of start of next read, without using % operator. */
     295     2167118 :     buffer_index += nblocks;
     296     2167118 :     if (buffer_index >= stream->queue_size)
     297      470248 :         buffer_index -= stream->queue_size;
     298             :     Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     299     2167118 :     stream->next_buffer_index = buffer_index;
     300             : 
     301             :     /* Adjust the pending read to cover the remaining portion, if any. */
     302     2167118 :     stream->pending_read_blocknum += nblocks;
     303     2167118 :     stream->pending_read_nblocks -= nblocks;
     304     2167118 : }
     305             : 
     306             : static void
     307     4036228 : read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
     308             : {
     309     6448098 :     while (stream->ios_in_progress < stream->max_ios &&
     310     6423556 :            stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
     311             :     {
     312             :         BlockNumber blocknum;
     313             :         int16       buffer_index;
     314             :         void       *per_buffer_data;
     315             : 
     316     3868106 :         if (stream->pending_read_nblocks == io_combine_limit)
     317             :         {
     318           0 :             read_stream_start_pending_read(stream, suppress_advice);
     319           0 :             suppress_advice = false;
     320           0 :             continue;
     321             :         }
     322             : 
     323             :         /*
     324             :          * See which block the callback wants next in the stream.  We need to
     325             :          * compute the index of the Nth block of the pending read including
     326             :          * wrap-around, but we don't want to use the expensive % operator.
     327             :          */
     328     3868106 :         buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
     329     3868106 :         if (buffer_index >= stream->queue_size)
     330       93650 :             buffer_index -= stream->queue_size;
     331             :         Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
     332     3868106 :         per_buffer_data = get_per_buffer_data(stream, buffer_index);
     333     3868106 :         blocknum = read_stream_get_block(stream, per_buffer_data);
     334     3868106 :         if (blocknum == InvalidBlockNumber)
     335             :         {
     336             :             /* End of stream. */
     337     1456236 :             stream->distance = 0;
     338     1456236 :             break;
     339             :         }
     340             : 
     341             :         /* Can we merge it with the pending read? */
     342     2411870 :         if (stream->pending_read_nblocks > 0 &&
     343      311540 :             stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
     344             :         {
     345      311538 :             stream->pending_read_nblocks++;
     346      311538 :             continue;
     347             :         }
     348             : 
     349             :         /* We have to start the pending read before we can build another. */
     350     2100334 :         while (stream->pending_read_nblocks > 0)
     351             :         {
     352           2 :             read_stream_start_pending_read(stream, suppress_advice);
     353           2 :             suppress_advice = false;
     354           2 :             if (stream->ios_in_progress == stream->max_ios)
     355             :             {
     356             :                 /* And we've hit the limit.  Rewind, and stop here. */
     357           0 :                 read_stream_unget_block(stream, blocknum);
     358           0 :                 return;
     359             :             }
     360             :         }
     361             : 
     362             :         /* This is the start of a new pending read. */
     363     2100332 :         stream->pending_read_blocknum = blocknum;
     364     2100332 :         stream->pending_read_nblocks = 1;
     365             :     }
     366             : 
     367             :     /*
     368             :      * We don't start the pending read just because we've hit the distance
     369             :      * limit, preferring to give it another chance to grow to full
     370             :      * io_combine_limit size once more buffers have been consumed.  However,
     371             :      * if we've already reached io_combine_limit, or we've reached the
     372             :      * distance limit and there isn't anything pinned yet, or the callback has
     373             :      * signaled end-of-stream, we start the read immediately.
     374             :      */
     375     4036228 :     if (stream->pending_read_nblocks > 0 &&
     376     2277234 :         (stream->pending_read_nblocks == io_combine_limit ||
     377     2268014 :          (stream->pending_read_nblocks == stream->distance &&
     378     2014424 :           stream->pinned_buffers == 0) ||
     379      253590 :          stream->distance == 0) &&
     380     2167258 :         stream->ios_in_progress < stream->max_ios)
     381     2167116 :         read_stream_start_pending_read(stream, suppress_advice);
     382             : }
     383             : 
     384             : /*
     385             :  * Create a new read stream object that can be used to perform the equivalent
     386             :  * of a series of ReadBuffer() calls for one fork of one relation.
     387             :  * Internally, it generates larger vectored reads where possible by looking
     388             :  * ahead.  The callback should return block numbers or InvalidBlockNumber to
     389             :  * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
     390             :  * write extra data for each block into the space provided to it.  It will
     391             :  * also receive callback_private_data for its own purposes.
     392             :  */
     393             : static ReadStream *
     394      775028 : read_stream_begin_impl(int flags,
     395             :                        BufferAccessStrategy strategy,
     396             :                        Relation rel,
     397             :                        SMgrRelation smgr,
     398             :                        char persistence,
     399             :                        ForkNumber forknum,
     400             :                        ReadStreamBlockNumberCB callback,
     401             :                        void *callback_private_data,
     402             :                        size_t per_buffer_data_size)
     403             : {
     404             :     ReadStream *stream;
     405             :     size_t      size;
     406             :     int16       queue_size;
     407             :     int         max_ios;
     408             :     int         strategy_pin_limit;
     409             :     uint32      max_pinned_buffers;
     410             :     Oid         tablespace_id;
     411             : 
     412             :     /*
     413             :      * Decide how many I/Os we will allow to run at the same time.  That
     414             :      * currently means advice to the kernel to tell it that we will soon read.
     415             :      * This number also affects how far we look ahead for opportunities to
     416             :      * start more I/Os.
     417             :      */
     418      775028 :     tablespace_id = smgr->smgr_rlocator.locator.spcOid;
     419      775028 :     if (!OidIsValid(MyDatabaseId) ||
     420      899362 :         (rel && IsCatalogRelation(rel)) ||
     421      247624 :         IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
     422             :     {
     423             :         /*
     424             :          * Avoid circularity while trying to look up tablespace settings or
     425             :          * before spccache.c is ready.
     426             :          */
     427      627898 :         max_ios = effective_io_concurrency;
     428             :     }
     429      147130 :     else if (flags & READ_STREAM_MAINTENANCE)
     430        5760 :         max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
     431             :     else
     432      141370 :         max_ios = get_tablespace_io_concurrency(tablespace_id);
     433             : 
     434             :     /* Cap to INT16_MAX to avoid overflowing below */
     435      775028 :     max_ios = Min(max_ios, PG_INT16_MAX);
     436             : 
     437             :     /*
     438             :      * Choose the maximum number of buffers we're prepared to pin.  We try to
     439             :      * pin fewer if we can, though.  We clamp it to at least io_combine_limit
     440             :      * so that we can have a chance to build up a full io_combine_limit sized
     441             :      * read, even when max_ios is zero.  Be careful not to allow int16 to
     442             :      * overflow (even though that's not possible with the current GUC range
     443             :      * limits), allowing also for the spare entry and the overflow space.
     444             :      */
     445      775028 :     max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
     446      775028 :     max_pinned_buffers = Min(max_pinned_buffers,
     447             :                              PG_INT16_MAX - io_combine_limit - 1);
     448             : 
     449             :     /* Give the strategy a chance to limit the number of buffers we pin. */
     450      775028 :     strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
     451      775028 :     max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
     452             : 
     453             :     /* Don't allow this backend to pin more than its share of buffers. */
     454      775028 :     if (SmgrIsTemp(smgr))
     455       11038 :         LimitAdditionalLocalPins(&max_pinned_buffers);
     456             :     else
     457      763990 :         LimitAdditionalPins(&max_pinned_buffers);
     458             :     Assert(max_pinned_buffers > 0);
     459             : 
     460             :     /*
     461             :      * We need one extra entry for buffers and per-buffer data, because users
     462             :      * of per-buffer data have access to the object until the next call to
     463             :      * read_stream_next_buffer(), so we need a gap between the head and tail
     464             :      * of the queue so that we don't clobber it.
     465             :      */
     466      775028 :     queue_size = max_pinned_buffers + 1;
     467             : 
     468             :     /*
     469             :      * Allocate the object, the buffers, the ios and per_buffer_data space in
     470             :      * one big chunk.  Though we have queue_size buffers, we want to be able
     471             :      * to assume that all the buffers for a single read are contiguous (i.e.
     472             :      * don't wrap around halfway through), so we allow temporary overflows of
     473             :      * up to the maximum possible read size by allocating an extra
     474             :      * io_combine_limit - 1 elements.
     475             :      */
     476      775028 :     size = offsetof(ReadStream, buffers);
     477      775028 :     size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
     478      775028 :     size += sizeof(InProgressIO) * Max(1, max_ios);
     479      775028 :     size += per_buffer_data_size * queue_size;
     480      775028 :     size += MAXIMUM_ALIGNOF * 2;
     481      775028 :     stream = (ReadStream *) palloc(size);
     482      775028 :     memset(stream, 0, offsetof(ReadStream, buffers));
     483      775028 :     stream->ios = (InProgressIO *)
     484      775028 :         MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
     485      775028 :     if (per_buffer_data_size > 0)
     486           0 :         stream->per_buffer_data = (void *)
     487           0 :             MAXALIGN(&stream->ios[Max(1, max_ios)]);
     488             : 
     489             : #ifdef USE_PREFETCH
     490             : 
     491             :     /*
     492             :      * This system supports prefetching advice.  We can use it as long as
     493             :      * direct I/O isn't enabled, the caller hasn't promised sequential access
     494             :      * (overriding our detection heuristics), and max_ios hasn't been set to
     495             :      * zero.
     496             :      */
     497      775028 :     if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
     498      774822 :         (flags & READ_STREAM_SEQUENTIAL) == 0 &&
     499             :         max_ios > 0)
     500      127868 :         stream->advice_enabled = true;
     501             : #endif
     502             : 
     503             :     /*
     504             :      * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
     505             :      * above.  If we had real asynchronous I/O we might need a slightly
     506             :      * different definition.
     507             :      */
     508      775028 :     if (max_ios == 0)
     509           0 :         max_ios = 1;
     510             : 
     511      775028 :     stream->max_ios = max_ios;
     512      775028 :     stream->per_buffer_data_size = per_buffer_data_size;
     513      775028 :     stream->max_pinned_buffers = max_pinned_buffers;
     514      775028 :     stream->queue_size = queue_size;
     515      775028 :     stream->callback = callback;
     516      775028 :     stream->callback_private_data = callback_private_data;
     517      775028 :     stream->buffered_blocknum = InvalidBlockNumber;
     518             : 
     519             :     /*
     520             :      * Skip the initial ramp-up phase if the caller says we're going to be
     521             :      * reading the whole relation.  This way we start out assuming we'll be
     522             :      * doing full io_combine_limit sized reads (behavior B).
     523             :      */
     524      775028 :     if (flags & READ_STREAM_FULL)
     525      113316 :         stream->distance = Min(max_pinned_buffers, io_combine_limit);
     526             :     else
     527      661712 :         stream->distance = 1;
     528             : 
     529             :     /*
     530             :      * Since we always access the same relation, we can initialize parts of
     531             :      * the ReadBuffersOperation objects and leave them that way, to avoid
     532             :      * wasting CPU cycles writing to them for each read.
     533             :      */
     534     1650700 :     for (int i = 0; i < max_ios; ++i)
     535             :     {
     536      875672 :         stream->ios[i].op.rel = rel;
     537      875672 :         stream->ios[i].op.smgr = smgr;
     538      875672 :         stream->ios[i].op.persistence = persistence;
     539      875672 :         stream->ios[i].op.forknum = forknum;
     540      875672 :         stream->ios[i].op.strategy = strategy;
     541             :     }
     542             : 
     543      775028 :     return stream;
     544             : }
     545             : 
     546             : /*
     547             :  * Create a new read stream for reading a relation.
     548             :  * See read_stream_begin_impl() for the detailed explanation.
     549             :  */
     550             : ReadStream *
     551      667242 : read_stream_begin_relation(int flags,
     552             :                            BufferAccessStrategy strategy,
     553             :                            Relation rel,
     554             :                            ForkNumber forknum,
     555             :                            ReadStreamBlockNumberCB callback,
     556             :                            void *callback_private_data,
     557             :                            size_t per_buffer_data_size)
     558             : {
     559      667242 :     return read_stream_begin_impl(flags,
     560             :                                   strategy,
     561             :                                   rel,
     562             :                                   RelationGetSmgr(rel),
     563      667242 :                                   rel->rd_rel->relpersistence,
     564             :                                   forknum,
     565             :                                   callback,
     566             :                                   callback_private_data,
     567             :                                   per_buffer_data_size);
     568             : }
     569             : 
     570             : /*
     571             :  * Create a new read stream for reading a SMgr relation.
     572             :  * See read_stream_begin_impl() for the detailed explanation.
     573             :  */
     574             : ReadStream *
     575      107786 : read_stream_begin_smgr_relation(int flags,
     576             :                                 BufferAccessStrategy strategy,
     577             :                                 SMgrRelation smgr,
     578             :                                 char smgr_persistence,
     579             :                                 ForkNumber forknum,
     580             :                                 ReadStreamBlockNumberCB callback,
     581             :                                 void *callback_private_data,
     582             :                                 size_t per_buffer_data_size)
     583             : {
     584      107786 :     return read_stream_begin_impl(flags,
     585             :                                   strategy,
     586             :                                   NULL,
     587             :                                   smgr,
     588             :                                   smgr_persistence,
     589             :                                   forknum,
     590             :                                   callback,
     591             :                                   callback_private_data,
     592             :                                   per_buffer_data_size);
     593             : }
     594             : 
     595             : /*
     596             :  * Pull one pinned buffer out of a stream.  Each call returns successive
     597             :  * blocks in the order specified by the callback.  If per_buffer_data_size was
     598             :  * set to a non-zero size, *per_buffer_data receives a pointer to the extra
     599             :  * per-buffer data that the callback had a chance to populate, which remains
     600             :  * valid until the next call to read_stream_next_buffer().  When the stream
     601             :  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
     602             :  * the stream early at any time by calling read_stream_end().
     603             :  */
     604             : Buffer
     605     8942434 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
     606             : {
     607             :     Buffer      buffer;
     608             :     int16       oldest_buffer_index;
     609             : 
     610             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     611             : 
     612             :     /*
     613             :      * A fast path for all-cached scans (behavior A).  This is the same as the
     614             :      * usual algorithm, but it is specialized for no I/O and no per-buffer
     615             :      * data, so we can skip the queue management code, stay in the same buffer
     616             :      * slot and use singular StartReadBuffer().
     617             :      */
     618     8942434 :     if (likely(stream->fast_path))
     619             :     {
     620             :         BlockNumber next_blocknum;
     621             : 
     622             :         /* Fast path assumptions. */
     623             :         Assert(stream->ios_in_progress == 0);
     624             :         Assert(stream->pinned_buffers == 1);
     625             :         Assert(stream->distance == 1);
     626             :         Assert(stream->pending_read_nblocks == 0);
     627             :         Assert(stream->per_buffer_data_size == 0);
     628             : 
     629             :         /* We're going to return the buffer we pinned last time. */
     630     3525538 :         oldest_buffer_index = stream->oldest_buffer_index;
     631             :         Assert((oldest_buffer_index + 1) % stream->queue_size ==
     632             :                stream->next_buffer_index);
     633     3525538 :         buffer = stream->buffers[oldest_buffer_index];
     634             :         Assert(buffer != InvalidBuffer);
     635             : 
     636             :         /* Choose the next block to pin. */
     637     3525538 :         next_blocknum = read_stream_get_block(stream, NULL);
     638             : 
     639     3525538 :         if (likely(next_blocknum != InvalidBlockNumber))
     640             :         {
     641             :             /*
     642             :              * Pin a buffer for the next call.  Same buffer entry, and
     643             :              * arbitrary I/O entry (they're all free).  We don't have to
     644             :              * adjust pinned_buffers because we're transferring one to caller
     645             :              * but pinning one more.
     646             :              */
     647     3374232 :             if (likely(!StartReadBuffer(&stream->ios[0].op,
     648             :                                         &stream->buffers[oldest_buffer_index],
     649             :                                         next_blocknum,
     650             :                                         stream->advice_enabled ?
     651             :                                         READ_BUFFERS_ISSUE_ADVICE : 0)))
     652             :             {
     653             :                 /* Fast return. */
     654     3352990 :                 return buffer;
     655             :             }
     656             : 
     657             :             /* Next call must wait for I/O for the newly pinned buffer. */
     658       21242 :             stream->oldest_io_index = 0;
     659       21242 :             stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
     660       21242 :             stream->ios_in_progress = 1;
     661       21242 :             stream->ios[0].buffer_index = oldest_buffer_index;
     662       21242 :             stream->seq_blocknum = next_blocknum + 1;
     663             :         }
     664             :         else
     665             :         {
     666             :             /* No more blocks, end of stream. */
     667      151306 :             stream->distance = 0;
     668      151306 :             stream->oldest_buffer_index = stream->next_buffer_index;
     669      151306 :             stream->pinned_buffers = 0;
     670             :         }
     671             : 
     672      172548 :         stream->fast_path = false;
     673      172548 :         return buffer;
     674             :     }
     675             : #endif
     676             : 
     677     5416896 :     if (unlikely(stream->pinned_buffers == 0))
     678             :     {
     679             :         Assert(stream->oldest_buffer_index == stream->next_buffer_index);
     680             : 
     681             :         /* End of stream reached?  */
     682     4152984 :         if (stream->distance == 0)
     683     2377298 :             return InvalidBuffer;
     684             : 
     685             :         /*
     686             :          * The usual order of operations is that we look ahead at the bottom
     687             :          * of this function after potentially finishing an I/O and making
     688             :          * space for more, but if we're just starting up we'll need to crank
     689             :          * the handle to get started.
     690             :          */
     691     1775686 :         read_stream_look_ahead(stream, true);
     692             : 
     693             :         /* End of stream reached? */
     694     1775686 :         if (stream->pinned_buffers == 0)
     695             :         {
     696             :             Assert(stream->distance == 0);
     697      779056 :             return InvalidBuffer;
     698             :         }
     699             :     }
     700             : 
     701             :     /* Grab the oldest pinned buffer and associated per-buffer data. */
     702             :     Assert(stream->pinned_buffers > 0);
     703     2260542 :     oldest_buffer_index = stream->oldest_buffer_index;
     704             :     Assert(oldest_buffer_index >= 0 &&
     705             :            oldest_buffer_index < stream->queue_size);
     706     2260542 :     buffer = stream->buffers[oldest_buffer_index];
     707     2260542 :     if (per_buffer_data)
     708           0 :         *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
     709             : 
     710             :     Assert(BufferIsValid(buffer));
     711             : 
     712             :     /* Do we have to wait for an associated I/O first? */
     713     2260542 :     if (stream->ios_in_progress > 0 &&
     714      941740 :         stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
     715             :     {
     716      917150 :         int16       io_index = stream->oldest_io_index;
     717             :         int16       distance;
     718             : 
     719             :         /* Sanity check that we still agree on the buffers. */
     720             :         Assert(stream->ios[io_index].op.buffers ==
     721             :                &stream->buffers[oldest_buffer_index]);
     722             : 
     723      917150 :         WaitReadBuffers(&stream->ios[io_index].op);
     724             : 
     725             :         Assert(stream->ios_in_progress > 0);
     726      917150 :         stream->ios_in_progress--;
     727      917150 :         if (++stream->oldest_io_index == stream->max_ios)
     728      897184 :             stream->oldest_io_index = 0;
     729             : 
     730      917150 :         if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
     731             :         {
     732             :             /* Distance ramps up fast (behavior C). */
     733         552 :             distance = stream->distance * 2;
     734         552 :             distance = Min(distance, stream->max_pinned_buffers);
     735         552 :             stream->distance = distance;
     736             :         }
     737             :         else
     738             :         {
     739             :             /* No advice; move towards io_combine_limit (behavior B). */
     740      916598 :             if (stream->distance > io_combine_limit)
     741             :             {
     742           0 :                 stream->distance--;
     743             :             }
     744             :             else
     745             :             {
     746      916598 :                 distance = stream->distance * 2;
     747      916598 :                 distance = Min(distance, io_combine_limit);
     748      916598 :                 distance = Min(distance, stream->max_pinned_buffers);
     749      916598 :                 stream->distance = distance;
     750             :             }
     751             :         }
     752             :     }
     753             : 
     754             : #ifdef CLOBBER_FREED_MEMORY
     755             :     /* Clobber old buffer and per-buffer data for debugging purposes. */
     756             :     stream->buffers[oldest_buffer_index] = InvalidBuffer;
     757             : 
     758             :     /*
     759             :      * The caller will get access to the per-buffer data, until the next call.
     760             :      * We wipe the one before, which is never occupied because queue_size
     761             :      * allowed one extra element.  This will hopefully trip up client code
     762             :      * that is holding a dangling pointer to it.
     763             :      */
     764             :     if (stream->per_buffer_data)
     765             :         wipe_mem(get_per_buffer_data(stream,
     766             :                                      oldest_buffer_index == 0 ?
     767             :                                      stream->queue_size - 1 :
     768             :                                      oldest_buffer_index - 1),
     769             :                  stream->per_buffer_data_size);
     770             : #endif
     771             : 
     772             :     /* Pin transferred to caller. */
     773             :     Assert(stream->pinned_buffers > 0);
     774     2260542 :     stream->pinned_buffers--;
     775             : 
     776             :     /* Advance oldest buffer, with wrap-around. */
     777     2260542 :     stream->oldest_buffer_index++;
     778     2260542 :     if (stream->oldest_buffer_index == stream->queue_size)
     779      456794 :         stream->oldest_buffer_index = 0;
     780             : 
     781             :     /* Prepare for the next call. */
     782     2260542 :     read_stream_look_ahead(stream, false);
     783             : 
     784             : #ifndef READ_STREAM_DISABLE_FAST_PATH
     785             :     /* See if we can take the fast path for all-cached scans next time. */
     786     2260542 :     if (stream->ios_in_progress == 0 &&
     787     1460522 :         stream->pinned_buffers == 1 &&
     788      440102 :         stream->distance == 1 &&
     789      331610 :         stream->pending_read_nblocks == 0 &&
     790      330338 :         stream->per_buffer_data_size == 0)
     791             :     {
     792      330338 :         stream->fast_path = true;
     793             :     }
     794             : #endif
     795             : 
     796     2260542 :     return buffer;
     797             : }
     798             : 
     799             : /*
     800             :  * Transitional support for code that would like to perform or skip reads
     801             :  * itself, without using the stream.  Returns, and consumes, the next block
     802             :  * number that would be read by the stream's look-ahead algorithm, or
     803             :  * InvalidBlockNumber if the end of the stream is reached.  Also reports the
     804             :  * strategy that would be used to read it.
     805             :  */
     806             : BlockNumber
     807           0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
     808             : {
     809           0 :     *strategy = stream->ios[0].op.strategy;
     810           0 :     return read_stream_get_block(stream, NULL);
     811             : }
     812             : 
     813             : /*
     814             :  * Reset a read stream by releasing any queued up buffers, allowing the stream
     815             :  * to be used again for different blocks.  This can be used to clear an
     816             :  * end-of-stream condition and start again, or to throw away blocks that were
     817             :  * speculatively read and read some different blocks instead.
     818             :  */
     819             : void
     820     1776030 : read_stream_reset(ReadStream *stream)
     821             : {
     822             :     Buffer      buffer;
     823             : 
     824             :     /* Stop looking ahead. */
     825     1776030 :     stream->distance = 0;
     826             : 
     827             :     /* Forget buffered block number and fast path state. */
     828     1776030 :     stream->buffered_blocknum = InvalidBlockNumber;
     829     1776030 :     stream->fast_path = false;
     830             : 
     831             :     /* Unpin anything that wasn't consumed. */
     832     1964658 :     while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
     833      188628 :         ReleaseBuffer(buffer);
     834             : 
     835             :     Assert(stream->pinned_buffers == 0);
     836             :     Assert(stream->ios_in_progress == 0);
     837             : 
     838             :     /* Start off assuming data is cached. */
     839     1776030 :     stream->distance = 1;
     840     1776030 : }
     841             : 
     842             : /*
     843             :  * Release and free a read stream.
     844             :  */
     845             : void
     846      772508 : read_stream_end(ReadStream *stream)
     847             : {
     848      772508 :     read_stream_reset(stream);
     849      772508 :     pfree(stream);
     850      772508 : }

Generated by: LCOV version 1.14