LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogprefetcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 276 283 97.5 %
Date: 2024-04-25 21:11:15 Functions: 24 24 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * xlogprefetcher.c
       4             :  *      Prefetching support for recovery.
       5             :  *
       6             :  * Portions Copyright (c) 2022-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/backend/access/transam/xlogprefetcher.c
      12             :  *
      13             :  * This module provides a drop-in replacement for an XLogReader that tries to
      14             :  * minimize I/O stalls by looking ahead in the WAL.  If blocks that will be
      15             :  * accessed in the near future are not already in the buffer pool, it initiates
      16             :  * I/Os that might complete before the caller eventually needs the data.  When
      17             :  * referenced blocks are found in the buffer pool already, the buffer is
      18             :  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
      19             :  * avoid a second buffer mapping table lookup.
      20             :  *
      21             :  * Currently, only the main fork is considered for prefetching.  Currently,
      22             :  * prefetching is only effective on systems where PrefetchBuffer() does
      23             :  * something useful (mainly Linux).
      24             :  *
      25             :  *-------------------------------------------------------------------------
      26             :  */
      27             : 
      28             : #include "postgres.h"
      29             : 
      30             : #include "access/xlogprefetcher.h"
      31             : #include "access/xlogreader.h"
      32             : #include "catalog/pg_control.h"
      33             : #include "catalog/storage_xlog.h"
      34             : #include "commands/dbcommands_xlog.h"
      35             : #include "funcapi.h"
      36             : #include "miscadmin.h"
      37             : #include "port/atomics.h"
      38             : #include "storage/bufmgr.h"
      39             : #include "storage/shmem.h"
      40             : #include "storage/smgr.h"
      41             : #include "utils/fmgrprotos.h"
      42             : #include "utils/guc_hooks.h"
      43             : #include "utils/hsearch.h"
      44             : #include "utils/timestamp.h"
      45             : 
      46             : /*
      47             :  * Every time we process this much WAL, we'll update the values in
      48             :  * pg_stat_recovery_prefetch.
      49             :  */
      50             : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
      51             : 
      52             : /*
      53             :  * To detect repeated access to the same block and skip useless extra system
      54             :  * calls, we remember a small window of recently prefetched blocks.
      55             :  */
      56             : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
      57             : 
      58             : /*
      59             :  * When maintenance_io_concurrency is not saturated, we're prepared to look
      60             :  * ahead up to N times that number of block references.
      61             :  */
      62             : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
      63             : 
      64             : /* Define to log internal debugging messages. */
      65             : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
      66             : 
      67             : /* GUCs */
      68             : int         recovery_prefetch = RECOVERY_PREFETCH_TRY;
      69             : 
      70             : #ifdef USE_PREFETCH
      71             : #define RecoveryPrefetchEnabled() \
      72             :         (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
      73             :          maintenance_io_concurrency > 0)
      74             : #else
      75             : #define RecoveryPrefetchEnabled() false
      76             : #endif
      77             : 
      78             : static int  XLogPrefetchReconfigureCount = 0;
      79             : 
      80             : /*
      81             :  * Enum used to report whether an IO should be started.
      82             :  */
      83             : typedef enum
      84             : {
      85             :     LRQ_NEXT_NO_IO,
      86             :     LRQ_NEXT_IO,
      87             :     LRQ_NEXT_AGAIN,
      88             : } LsnReadQueueNextStatus;
      89             : 
      90             : /*
      91             :  * Type of callback that can decide which block to prefetch next.  For now
      92             :  * there is only one.
      93             :  */
      94             : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
      95             :                                                        XLogRecPtr *lsn);
      96             : 
      97             : /*
      98             :  * A simple circular queue of LSNs, using to control the number of
      99             :  * (potentially) inflight IOs.  This stands in for a later more general IO
     100             :  * control mechanism, which is why it has the apparently unnecessary
     101             :  * indirection through a function pointer.
     102             :  */
     103             : typedef struct LsnReadQueue
     104             : {
     105             :     LsnReadQueueNextFun next;
     106             :     uintptr_t   lrq_private;
     107             :     uint32      max_inflight;
     108             :     uint32      inflight;
     109             :     uint32      completed;
     110             :     uint32      head;
     111             :     uint32      tail;
     112             :     uint32      size;
     113             :     struct
     114             :     {
     115             :         bool        io;
     116             :         XLogRecPtr  lsn;
     117             :     }           queue[FLEXIBLE_ARRAY_MEMBER];
     118             : } LsnReadQueue;
     119             : 
     120             : /*
     121             :  * A prefetcher.  This is a mechanism that wraps an XLogReader, prefetching
     122             :  * blocks that will be soon be referenced, to try to avoid IO stalls.
     123             :  */
     124             : struct XLogPrefetcher
     125             : {
     126             :     /* WAL reader and current reading state. */
     127             :     XLogReaderState *reader;
     128             :     DecodedXLogRecord *record;
     129             :     int         next_block_id;
     130             : 
     131             :     /* When to publish stats. */
     132             :     XLogRecPtr  next_stats_shm_lsn;
     133             : 
     134             :     /* Book-keeping to avoid accessing blocks that don't exist yet. */
     135             :     HTAB       *filter_table;
     136             :     dlist_head  filter_queue;
     137             : 
     138             :     /* Book-keeping to avoid repeat prefetches. */
     139             :     RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     140             :     BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     141             :     int         recent_idx;
     142             : 
     143             :     /* Book-keeping to disable prefetching temporarily. */
     144             :     XLogRecPtr  no_readahead_until;
     145             : 
     146             :     /* IO depth manager. */
     147             :     LsnReadQueue *streaming_read;
     148             : 
     149             :     XLogRecPtr  begin_ptr;
     150             : 
     151             :     int         reconfigure_count;
     152             : };
     153             : 
     154             : /*
     155             :  * A temporary filter used to track block ranges that haven't been created
     156             :  * yet, whole relations that haven't been created yet, and whole relations
     157             :  * that (we assume) have already been dropped, or will be created by bulk WAL
     158             :  * operators.
     159             :  */
     160             : typedef struct XLogPrefetcherFilter
     161             : {
     162             :     RelFileLocator rlocator;
     163             :     XLogRecPtr  filter_until_replayed;
     164             :     BlockNumber filter_from_block;
     165             :     dlist_node  link;
     166             : } XLogPrefetcherFilter;
     167             : 
     168             : /*
     169             :  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
     170             :  */
     171             : typedef struct XLogPrefetchStats
     172             : {
     173             :     pg_atomic_uint64 reset_time;    /* Time of last reset. */
     174             :     pg_atomic_uint64 prefetch;  /* Prefetches initiated. */
     175             :     pg_atomic_uint64 hit;       /* Blocks already in cache. */
     176             :     pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
     177             :     pg_atomic_uint64 skip_new;  /* New/missing blocks filtered. */
     178             :     pg_atomic_uint64 skip_fpw;  /* FPWs skipped. */
     179             :     pg_atomic_uint64 skip_rep;  /* Repeat accesses skipped. */
     180             : 
     181             :     /* Dynamic values */
     182             :     int         wal_distance;   /* Number of WAL bytes ahead. */
     183             :     int         block_distance; /* Number of block references ahead. */
     184             :     int         io_depth;       /* Number of I/Os in progress. */
     185             : } XLogPrefetchStats;
     186             : 
     187             : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
     188             :                                            RelFileLocator rlocator,
     189             :                                            BlockNumber blockno,
     190             :                                            XLogRecPtr lsn);
     191             : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
     192             :                                             RelFileLocator rlocator,
     193             :                                             BlockNumber blockno);
     194             : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
     195             :                                                  XLogRecPtr replaying_lsn);
     196             : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
     197             :                                                       XLogRecPtr *lsn);
     198             : 
     199             : static XLogPrefetchStats *SharedStats;
     200             : 
     201             : static inline LsnReadQueue *
     202        3286 : lrq_alloc(uint32 max_distance,
     203             :           uint32 max_inflight,
     204             :           uintptr_t lrq_private,
     205             :           LsnReadQueueNextFun next)
     206             : {
     207             :     LsnReadQueue *lrq;
     208             :     uint32      size;
     209             : 
     210             :     Assert(max_distance >= max_inflight);
     211             : 
     212        3286 :     size = max_distance + 1;    /* full ring buffer has a gap */
     213        3286 :     lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
     214        3286 :     lrq->lrq_private = lrq_private;
     215        3286 :     lrq->max_inflight = max_inflight;
     216        3286 :     lrq->size = size;
     217        3286 :     lrq->next = next;
     218        3286 :     lrq->head = 0;
     219        3286 :     lrq->tail = 0;
     220        3286 :     lrq->inflight = 0;
     221        3286 :     lrq->completed = 0;
     222             : 
     223        3286 :     return lrq;
     224             : }
     225             : 
     226             : static inline void
     227        3186 : lrq_free(LsnReadQueue *lrq)
     228             : {
     229        3186 :     pfree(lrq);
     230        3186 : }
     231             : 
     232             : static inline uint32
     233      118656 : lrq_inflight(LsnReadQueue *lrq)
     234             : {
     235      118656 :     return lrq->inflight;
     236             : }
     237             : 
     238             : static inline uint32
     239      118656 : lrq_completed(LsnReadQueue *lrq)
     240             : {
     241      118656 :     return lrq->completed;
     242             : }
     243             : 
     244             : static inline void
     245     5275670 : lrq_prefetch(LsnReadQueue *lrq)
     246             : {
     247             :     /* Try to start as many IOs as we can within our limits. */
     248    10855180 :     while (lrq->inflight < lrq->max_inflight &&
     249    10830622 :            lrq->inflight + lrq->completed < lrq->size - 1)
     250             :     {
     251             :         Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
     252     6294198 :         switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
     253             :         {
     254      714594 :             case LRQ_NEXT_AGAIN:
     255      714594 :                 return;
     256       80334 :             case LRQ_NEXT_IO:
     257       80334 :                 lrq->queue[lrq->head].io = true;
     258       80334 :                 lrq->inflight++;
     259       80334 :                 break;
     260     5499176 :             case LRQ_NEXT_NO_IO:
     261     5499176 :                 lrq->queue[lrq->head].io = false;
     262     5499176 :                 lrq->completed++;
     263     5499176 :                 break;
     264             :         }
     265     5579510 :         lrq->head++;
     266     5579510 :         if (lrq->head == lrq->size)
     267      135988 :             lrq->head = 0;
     268             :     }
     269             : }
     270             : 
     271             : static inline void
     272     5275640 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
     273             : {
     274             :     /*
     275             :      * We know that LSNs before 'lsn' have been replayed, so we can now assume
     276             :      * that any IOs that were started before then have finished.
     277             :      */
     278    10855044 :     while (lrq->tail != lrq->head &&
     279    10763324 :            lrq->queue[lrq->tail].lsn < lsn)
     280             :     {
     281     5579404 :         if (lrq->queue[lrq->tail].io)
     282       80334 :             lrq->inflight--;
     283             :         else
     284     5499070 :             lrq->completed--;
     285     5579404 :         lrq->tail++;
     286     5579404 :         if (lrq->tail == lrq->size)
     287      135986 :             lrq->tail = 0;
     288             :     }
     289     5275640 :     if (RecoveryPrefetchEnabled())
     290     5275640 :         lrq_prefetch(lrq);
     291     5275546 : }
     292             : 
     293             : size_t
     294        3298 : XLogPrefetchShmemSize(void)
     295             : {
     296        3298 :     return sizeof(XLogPrefetchStats);
     297             : }
     298             : 
     299             : /*
     300             :  * Reset all counters to zero.
     301             :  */
     302             : void
     303           6 : XLogPrefetchResetStats(void)
     304             : {
     305           6 :     pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     306           6 :     pg_atomic_write_u64(&SharedStats->prefetch, 0);
     307           6 :     pg_atomic_write_u64(&SharedStats->hit, 0);
     308           6 :     pg_atomic_write_u64(&SharedStats->skip_init, 0);
     309           6 :     pg_atomic_write_u64(&SharedStats->skip_new, 0);
     310           6 :     pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
     311           6 :     pg_atomic_write_u64(&SharedStats->skip_rep, 0);
     312           6 : }
     313             : 
     314             : void
     315        1768 : XLogPrefetchShmemInit(void)
     316             : {
     317             :     bool        found;
     318             : 
     319        1768 :     SharedStats = (XLogPrefetchStats *)
     320        1768 :         ShmemInitStruct("XLogPrefetchStats",
     321             :                         sizeof(XLogPrefetchStats),
     322             :                         &found);
     323             : 
     324        1768 :     if (!found)
     325             :     {
     326        1768 :         pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     327        1768 :         pg_atomic_init_u64(&SharedStats->prefetch, 0);
     328        1768 :         pg_atomic_init_u64(&SharedStats->hit, 0);
     329        1768 :         pg_atomic_init_u64(&SharedStats->skip_init, 0);
     330        1768 :         pg_atomic_init_u64(&SharedStats->skip_new, 0);
     331        1768 :         pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
     332        1768 :         pg_atomic_init_u64(&SharedStats->skip_rep, 0);
     333             :     }
     334        1768 : }
     335             : 
     336             : /*
     337             :  * Called when any GUC is changed that affects prefetching.
     338             :  */
     339             : void
     340          20 : XLogPrefetchReconfigure(void)
     341             : {
     342          20 :     XLogPrefetchReconfigureCount++;
     343          20 : }
     344             : 
     345             : /*
     346             :  * Increment a counter in shared memory.  This is equivalent to *counter++ on a
     347             :  * plain uint64 without any memory barrier or locking, except on platforms
     348             :  * where readers can't read uint64 without possibly observing a torn value.
     349             :  */
     350             : static inline void
     351     5563888 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
     352             : {
     353             :     Assert(AmStartupProcess() || !IsUnderPostmaster);
     354     5563888 :     pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
     355     5563888 : }
     356             : 
     357             : /*
     358             :  * Create a prefetcher that is ready to begin prefetching blocks referenced by
     359             :  * WAL records.
     360             :  */
     361             : XLogPrefetcher *
     362        1520 : XLogPrefetcherAllocate(XLogReaderState *reader)
     363             : {
     364             :     XLogPrefetcher *prefetcher;
     365             :     static HASHCTL hash_table_ctl = {
     366             :         .keysize = sizeof(RelFileLocator),
     367             :         .entrysize = sizeof(XLogPrefetcherFilter)
     368             :     };
     369             : 
     370        1520 :     prefetcher = palloc0(sizeof(XLogPrefetcher));
     371             : 
     372        1520 :     prefetcher->reader = reader;
     373        1520 :     prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
     374             :                                            &hash_table_ctl,
     375             :                                            HASH_ELEM | HASH_BLOBS);
     376        1520 :     dlist_init(&prefetcher->filter_queue);
     377             : 
     378        1520 :     SharedStats->wal_distance = 0;
     379        1520 :     SharedStats->block_distance = 0;
     380        1520 :     SharedStats->io_depth = 0;
     381             : 
     382             :     /* First usage will cause streaming_read to be allocated. */
     383        1520 :     prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
     384             : 
     385        1520 :     return prefetcher;
     386             : }
     387             : 
     388             : /*
     389             :  * Destroy a prefetcher and release all resources.
     390             :  */
     391             : void
     392        1420 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
     393             : {
     394        1420 :     lrq_free(prefetcher->streaming_read);
     395        1420 :     hash_destroy(prefetcher->filter_table);
     396        1420 :     pfree(prefetcher);
     397        1420 : }
     398             : 
     399             : /*
     400             :  * Provide access to the reader.
     401             :  */
     402             : XLogReaderState *
     403     5275470 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
     404             : {
     405     5275470 :     return prefetcher->reader;
     406             : }
     407             : 
     408             : /*
     409             :  * Update the statistics visible in the pg_stat_recovery_prefetch view.
     410             :  */
     411             : void
     412      118656 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
     413             : {
     414             :     uint32      io_depth;
     415             :     uint32      completed;
     416             :     int64       wal_distance;
     417             : 
     418             : 
     419             :     /* How far ahead of replay are we now? */
     420      118656 :     if (prefetcher->reader->decode_queue_tail)
     421             :     {
     422       88912 :         wal_distance =
     423       88912 :             prefetcher->reader->decode_queue_tail->lsn -
     424       88912 :             prefetcher->reader->decode_queue_head->lsn;
     425             :     }
     426             :     else
     427             :     {
     428       29744 :         wal_distance = 0;
     429             :     }
     430             : 
     431             :     /* How many IOs are currently in flight and completed? */
     432      118656 :     io_depth = lrq_inflight(prefetcher->streaming_read);
     433      118656 :     completed = lrq_completed(prefetcher->streaming_read);
     434             : 
     435             :     /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
     436      118656 :     SharedStats->io_depth = io_depth;
     437      118656 :     SharedStats->block_distance = io_depth + completed;
     438      118656 :     SharedStats->wal_distance = wal_distance;
     439             : 
     440      118656 :     prefetcher->next_stats_shm_lsn =
     441      118656 :         prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
     442      118656 : }
     443             : 
     444             : /*
     445             :  * A callback that examines the next block reference in the WAL, and possibly
     446             :  * starts an IO so that a later read will be fast.
     447             :  *
     448             :  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
     449             :  *
     450             :  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
     451             :  * that isn't in the buffer pool, and the kernel has been asked to start
     452             :  * reading it to make a future read system call faster. An LSN is written to
     453             :  * *lsn, and the I/O will be considered to have completed once that LSN is
     454             :  * replayed.
     455             :  *
     456             :  * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
     457             :  * that it was already in the buffer pool, or we decided for various reasons
     458             :  * not to prefetch.
     459             :  */
     460             : static LsnReadQueueNextStatus
     461     6294198 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
     462             : {
     463     6294198 :     XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
     464     6294198 :     XLogReaderState *reader = prefetcher->reader;
     465     6294198 :     XLogRecPtr  replaying_lsn = reader->ReadRecPtr;
     466             : 
     467             :     /*
     468             :      * We keep track of the record and block we're up to between calls with
     469             :      * prefetcher->record and prefetcher->next_block_id.
     470             :      */
     471             :     for (;;)
     472     5271914 :     {
     473             :         DecodedXLogRecord *record;
     474             : 
     475             :         /* Try to read a new future record, if we don't already have one. */
     476    11566112 :         if (prefetcher->record == NULL)
     477             :         {
     478             :             bool        nonblocking;
     479             : 
     480             :             /*
     481             :              * If there are already records or an error queued up that could
     482             :              * be replayed, we don't want to block here.  Otherwise, it's OK
     483             :              * to block waiting for more data: presumably the caller has
     484             :              * nothing else to do.
     485             :              */
     486     5986602 :             nonblocking = XLogReaderHasQueuedRecordOrError(reader);
     487             : 
     488             :             /* Readahead is disabled until we replay past a certain point. */
     489     5986602 :             if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
     490      680764 :                 return LRQ_NEXT_AGAIN;
     491             : 
     492     5305838 :             record = XLogReadAhead(prefetcher->reader, nonblocking);
     493     5305744 :             if (record == NULL)
     494             :             {
     495             :                 /*
     496             :                  * We can't read any more, due to an error or lack of data in
     497             :                  * nonblocking mode.  Don't try to read ahead again until
     498             :                  * we've replayed everything already decoded.
     499             :                  */
     500       30564 :                 if (nonblocking && prefetcher->reader->decode_queue_tail)
     501       30280 :                     prefetcher->no_readahead_until =
     502       30280 :                         prefetcher->reader->decode_queue_tail->lsn;
     503             : 
     504       30564 :                 return LRQ_NEXT_AGAIN;
     505             :             }
     506             : 
     507             :             /*
     508             :              * If prefetching is disabled, we don't need to analyze the record
     509             :              * or issue any prefetches.  We just need to cause one record to
     510             :              * be decoded.
     511             :              */
     512     5275180 :             if (!RecoveryPrefetchEnabled())
     513             :             {
     514           0 :                 *lsn = InvalidXLogRecPtr;
     515           0 :                 return LRQ_NEXT_NO_IO;
     516             :             }
     517             : 
     518             :             /* We have a new record to process. */
     519     5275180 :             prefetcher->record = record;
     520     5275180 :             prefetcher->next_block_id = 0;
     521             :         }
     522             :         else
     523             :         {
     524             :             /* Continue to process from last call, or last loop. */
     525     5579510 :             record = prefetcher->record;
     526             :         }
     527             : 
     528             :         /*
     529             :          * Check for operations that require us to filter out block ranges, or
     530             :          * pause readahead completely.
     531             :          */
     532    10854690 :         if (replaying_lsn < record->lsn)
     533             :         {
     534    10854690 :             uint8       rmid = record->header.xl_rmid;
     535    10854690 :             uint8       record_type = record->header.xl_info & ~XLR_INFO_MASK;
     536             : 
     537    10854690 :             if (rmid == RM_XLOG_ID)
     538             :             {
     539      123788 :                 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
     540             :                     record_type == XLOG_END_OF_RECOVERY)
     541             :                 {
     542             :                     /*
     543             :                      * These records might change the TLI.  Avoid potential
     544             :                      * bugs if we were to allow "read TLI" and "replay TLI" to
     545             :                      * differ without more analysis.
     546             :                      */
     547        2556 :                     prefetcher->no_readahead_until = record->lsn;
     548             : 
     549             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     550             :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     551             :                          "suppressing all readahead until %X/%X is replayed due to possible TLI change",
     552             :                          LSN_FORMAT_ARGS(record->lsn));
     553             : #endif
     554             : 
     555             :                     /* Fall through so we move past this record. */
     556             :                 }
     557             :             }
     558    10730902 :             else if (rmid == RM_DBASE_ID)
     559             :             {
     560             :                 /*
     561             :                  * When databases are created with the file-copy strategy,
     562             :                  * there are no WAL records to tell us about the creation of
     563             :                  * individual relations.
     564             :                  */
     565          68 :                 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
     566             :                 {
     567           6 :                     xl_dbase_create_file_copy_rec *xlrec =
     568             :                         (xl_dbase_create_file_copy_rec *) record->main_data;
     569           6 :                     RelFileLocator rlocator =
     570           6 :                     {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
     571             : 
     572             :                     /*
     573             :                      * Don't try to prefetch anything in this database until
     574             :                      * it has been created, or we might confuse the blocks of
     575             :                      * different generations, if a database OID or
     576             :                      * relfilenumber is reused.  It's also more efficient than
     577             :                      * discovering that relations don't exist on disk yet with
     578             :                      * ENOENT errors.
     579             :                      */
     580           6 :                     XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
     581             : 
     582             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     583             :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     584             :                          "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
     585             :                          rlocator.dbOid,
     586             :                          LSN_FORMAT_ARGS(record->lsn));
     587             : #endif
     588             :                 }
     589             :             }
     590    10730834 :             else if (rmid == RM_SMGR_ID)
     591             :             {
     592       29284 :                 if (record_type == XLOG_SMGR_CREATE)
     593             :                 {
     594       29198 :                     xl_smgr_create *xlrec = (xl_smgr_create *)
     595             :                         record->main_data;
     596             : 
     597       29198 :                     if (xlrec->forkNum == MAIN_FORKNUM)
     598             :                     {
     599             :                         /*
     600             :                          * Don't prefetch anything for this whole relation
     601             :                          * until it has been created.  Otherwise we might
     602             :                          * confuse the blocks of different generations, if a
     603             :                          * relfilenumber is reused.  This also avoids the need
     604             :                          * to discover the problem via extra syscalls that
     605             :                          * report ENOENT.
     606             :                          */
     607       26072 :                         XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
     608             :                                                 record->lsn);
     609             : 
     610             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     611             :                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     612             :                              "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
     613             :                              xlrec->rlocator.spcOid,
     614             :                              xlrec->rlocator.dbOid,
     615             :                              xlrec->rlocator.relNumber,
     616             :                              LSN_FORMAT_ARGS(record->lsn));
     617             : #endif
     618             :                     }
     619             :                 }
     620          86 :                 else if (record_type == XLOG_SMGR_TRUNCATE)
     621             :                 {
     622          86 :                     xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
     623             :                         record->main_data;
     624             : 
     625             :                     /*
     626             :                      * Don't consider prefetching anything in the truncated
     627             :                      * range until the truncation has been performed.
     628             :                      */
     629          86 :                     XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
     630             :                                             xlrec->blkno,
     631             :                                             record->lsn);
     632             : 
     633             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     634             :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     635             :                          "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
     636             :                          xlrec->rlocator.spcOid,
     637             :                          xlrec->rlocator.dbOid,
     638             :                          xlrec->rlocator.relNumber,
     639             :                          xlrec->blkno,
     640             :                          LSN_FORMAT_ARGS(record->lsn));
     641             : #endif
     642             :                 }
     643             :             }
     644             :         }
     645             : 
     646             :         /* Scan the block references, starting where we left off last time. */
     647    10859312 :         while (prefetcher->next_block_id <= record->max_block_id)
     648             :         {
     649     5584132 :             int         block_id = prefetcher->next_block_id++;
     650     5584132 :             DecodedBkpBlock *block = &record->blocks[block_id];
     651             :             SMgrRelation reln;
     652             :             PrefetchBufferResult result;
     653             : 
     654     5584132 :             if (!block->in_use)
     655        4180 :                 continue;
     656             : 
     657             :             Assert(!BufferIsValid(block->prefetch_buffer));
     658             : 
     659             :             /*
     660             :              * Record the LSN of this record.  When it's replayed,
     661             :              * LsnReadQueue will consider any IOs submitted for earlier LSNs
     662             :              * to be finished.
     663             :              */
     664     5579952 :             *lsn = record->lsn;
     665             : 
     666             :             /* We don't try to prefetch anything but the main fork for now. */
     667     5579952 :             if (block->forknum != MAIN_FORKNUM)
     668             :             {
     669     5579510 :                 return LRQ_NEXT_NO_IO;
     670             :             }
     671             : 
     672             :             /*
     673             :              * If there is a full page image attached, we won't be reading the
     674             :              * page, so don't bother trying to prefetch.
     675             :              */
     676     5564330 :             if (block->has_image)
     677             :             {
     678       70326 :                 XLogPrefetchIncrement(&SharedStats->skip_fpw);
     679       70326 :                 return LRQ_NEXT_NO_IO;
     680             :             }
     681             : 
     682             :             /* There is no point in reading a page that will be zeroed. */
     683     5494004 :             if (block->flags & BKPBLOCK_WILL_INIT)
     684             :             {
     685      102842 :                 XLogPrefetchIncrement(&SharedStats->skip_init);
     686      102842 :                 return LRQ_NEXT_NO_IO;
     687             :             }
     688             : 
     689             :             /* Should we skip prefetching this block due to a filter? */
     690     5391162 :             if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
     691             :             {
     692      659376 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     693      659376 :                 return LRQ_NEXT_NO_IO;
     694             :             }
     695             : 
     696             :             /* There is no point in repeatedly prefetching the same block. */
     697    14396572 :             for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
     698             :             {
     699    13361242 :                 if (block->blkno == prefetcher->recent_block[i] &&
     700     3928236 :                     RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
     701             :                 {
     702             :                     /*
     703             :                      * XXX If we also remembered where it was, we could set
     704             :                      * recent_buffer so that recovery could skip smgropen()
     705             :                      * and a buffer table lookup.
     706             :                      */
     707     3696456 :                     XLogPrefetchIncrement(&SharedStats->skip_rep);
     708     3696456 :                     return LRQ_NEXT_NO_IO;
     709             :                 }
     710             :             }
     711     1035330 :             prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
     712     1035330 :             prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
     713     1035330 :             prefetcher->recent_idx =
     714     1035330 :                 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
     715             : 
     716             :             /*
     717             :              * We could try to have a fast path for repeated references to the
     718             :              * same relation (with some scheme to handle invalidations
     719             :              * safely), but for now we'll call smgropen() every time.
     720             :              */
     721     1035330 :             reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
     722             : 
     723             :             /*
     724             :              * If the relation file doesn't exist on disk, for example because
     725             :              * we're replaying after a crash and the file will be created and
     726             :              * then unlinked by WAL that hasn't been replayed yet, suppress
     727             :              * further prefetching in the relation until this record is
     728             :              * replayed.
     729             :              */
     730     1035330 :             if (!smgrexists(reln, MAIN_FORKNUM))
     731             :             {
     732             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     733             :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     734             :                      "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
     735             :                      reln->smgr_rlocator.locator.spcOid,
     736             :                      reln->smgr_rlocator.locator.dbOid,
     737             :                      reln->smgr_rlocator.locator.relNumber,
     738             :                      LSN_FORMAT_ARGS(record->lsn));
     739             : #endif
     740          12 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
     741             :                                         record->lsn);
     742          12 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     743          12 :                 return LRQ_NEXT_NO_IO;
     744             :             }
     745             : 
     746             :             /*
     747             :              * If the relation isn't big enough to contain the referenced
     748             :              * block yet, suppress prefetching of this block and higher until
     749             :              * this record is replayed.
     750             :              */
     751     1035318 :             if (block->blkno >= smgrnblocks(reln, block->forknum))
     752             :             {
     753             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     754             :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     755             :                      "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
     756             :                      reln->smgr_rlocator.locator.spcOid,
     757             :                      reln->smgr_rlocator.locator.dbOid,
     758             :                      reln->smgr_rlocator.locator.relNumber,
     759             :                      block->blkno,
     760             :                      LSN_FORMAT_ARGS(record->lsn));
     761             : #endif
     762       27172 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
     763             :                                         record->lsn);
     764       27172 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     765       27172 :                 return LRQ_NEXT_NO_IO;
     766             :             }
     767             : 
     768             :             /* Try to initiate prefetching. */
     769     1008146 :             result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
     770     1008146 :             if (BufferIsValid(result.recent_buffer))
     771             :             {
     772             :                 /* Cache hit, nothing to do. */
     773      927370 :                 XLogPrefetchIncrement(&SharedStats->hit);
     774      927370 :                 block->prefetch_buffer = result.recent_buffer;
     775      927370 :                 return LRQ_NEXT_NO_IO;
     776             :             }
     777       80776 :             else if (result.initiated_io)
     778             :             {
     779             :                 /* Cache miss, I/O (presumably) started. */
     780       80334 :                 XLogPrefetchIncrement(&SharedStats->prefetch);
     781       80334 :                 block->prefetch_buffer = InvalidBuffer;
     782       80334 :                 return LRQ_NEXT_IO;
     783             :             }
     784         442 :             else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
     785             :             {
     786             :                 /*
     787             :                  * This shouldn't be possible, because we already determined
     788             :                  * that the relation exists on disk and is big enough.
     789             :                  * Something is wrong with the cache invalidation for
     790             :                  * smgrexists(), smgrnblocks(), or the file was unlinked or
     791             :                  * truncated beneath our feet?
     792             :                  */
     793           0 :                 elog(ERROR,
     794             :                      "could not prefetch relation %u/%u/%u block %u",
     795             :                      reln->smgr_rlocator.locator.spcOid,
     796             :                      reln->smgr_rlocator.locator.dbOid,
     797             :                      reln->smgr_rlocator.locator.relNumber,
     798             :                      block->blkno);
     799             :             }
     800             :         }
     801             : 
     802             :         /*
     803             :          * Several callsites need to be able to read exactly one record
     804             :          * without any internal readahead.  Examples: xlog.c reading
     805             :          * checkpoint records with emode set to PANIC, which might otherwise
     806             :          * cause XLogPageRead() to panic on some future page, and xlog.c
     807             :          * determining where to start writing WAL next, which depends on the
     808             :          * contents of the reader's internal buffer after reading one record.
     809             :          * Therefore, don't even think about prefetching until the first
     810             :          * record after XLogPrefetcherBeginRead() has been consumed.
     811             :          */
     812     5275180 :         if (prefetcher->reader->decode_queue_tail &&
     813     5275178 :             prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
     814        3266 :             return LRQ_NEXT_AGAIN;
     815             : 
     816             :         /* Advance to the next record. */
     817     5271914 :         prefetcher->record = NULL;
     818             :     }
     819             :     pg_unreachable();
     820             : }
     821             : 
     822             : /*
     823             :  * Expose statistics about recovery prefetching.
     824             :  */
     825             : Datum
     826          12 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
     827             : {
     828             : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
     829          12 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     830             :     Datum       values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     831             :     bool        nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     832             : 
     833          12 :     InitMaterializedSRF(fcinfo, 0);
     834             : 
     835         132 :     for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
     836         120 :         nulls[i] = false;
     837             : 
     838          12 :     values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
     839          12 :     values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
     840          12 :     values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
     841          12 :     values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
     842          12 :     values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
     843          12 :     values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
     844          12 :     values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
     845          12 :     values[7] = Int32GetDatum(SharedStats->wal_distance);
     846          12 :     values[8] = Int32GetDatum(SharedStats->block_distance);
     847          12 :     values[9] = Int32GetDatum(SharedStats->io_depth);
     848          12 :     tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
     849             : 
     850          12 :     return (Datum) 0;
     851             : }
     852             : 
     853             : /*
     854             :  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
     855             :  * has been replayed.
     856             :  */
     857             : static inline void
     858       53348 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     859             :                         BlockNumber blockno, XLogRecPtr lsn)
     860             : {
     861             :     XLogPrefetcherFilter *filter;
     862             :     bool        found;
     863             : 
     864       53348 :     filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
     865       53348 :     if (!found)
     866             :     {
     867             :         /*
     868             :          * Don't allow any prefetching of this block or higher until replayed.
     869             :          */
     870       53318 :         filter->filter_until_replayed = lsn;
     871       53318 :         filter->filter_from_block = blockno;
     872       53318 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     873             :     }
     874             :     else
     875             :     {
     876             :         /*
     877             :          * We were already filtering this rlocator.  Extend the filter's
     878             :          * lifetime to cover this WAL record, but leave the lower of the block
     879             :          * numbers there because we don't want to have to track individual
     880             :          * blocks.
     881             :          */
     882          30 :         filter->filter_until_replayed = lsn;
     883          30 :         dlist_delete(&filter->link);
     884          30 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     885          30 :         filter->filter_from_block = Min(filter->filter_from_block, blockno);
     886             :     }
     887       53348 : }
     888             : 
     889             : /*
     890             :  * Have we replayed any records that caused us to begin filtering a block
     891             :  * range?  That means that relations should have been created, extended or
     892             :  * dropped as required, so we can stop filtering out accesses to a given
     893             :  * relfilenumber.
     894             :  */
     895             : static inline void
     896     5275640 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
     897             : {
     898     5328950 :     while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     899             :     {
     900     1303948 :         XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
     901             :                                                           link,
     902             :                                                           &prefetcher->filter_queue);
     903             : 
     904     1303948 :         if (filter->filter_until_replayed >= replaying_lsn)
     905     1250638 :             break;
     906             : 
     907       53310 :         dlist_delete(&filter->link);
     908       53310 :         hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
     909             :     }
     910     5275640 : }
     911             : 
     912             : /*
     913             :  * Check if a given block should be skipped due to a filter.
     914             :  */
     915             : static inline bool
     916     5391162 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     917             :                          BlockNumber blockno)
     918             : {
     919             :     /*
     920             :      * Test for empty queue first, because we expect it to be empty most of
     921             :      * the time and we can avoid the hash table lookup in that case.
     922             :      */
     923     5391162 :     if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     924             :     {
     925             :         XLogPrefetcherFilter *filter;
     926             : 
     927             :         /* See if the block range is filtered. */
     928     1253022 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     929     1253022 :         if (filter && filter->filter_from_block <= blockno)
     930             :         {
     931             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     932             :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     933             :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
     934             :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     935             :                  LSN_FORMAT_ARGS(filter->filter_until_replayed),
     936             :                  filter->filter_from_block);
     937             : #endif
     938      659376 :             return true;
     939             :         }
     940             : 
     941             :         /* See if the whole database is filtered. */
     942      593646 :         rlocator.relNumber = InvalidRelFileNumber;
     943      593646 :         rlocator.spcOid = InvalidOid;
     944      593646 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     945      593646 :         if (filter)
     946             :         {
     947             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     948             :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     949             :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
     950             :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     951             :                  LSN_FORMAT_ARGS(filter->filter_until_replayed));
     952             : #endif
     953           0 :             return true;
     954             :         }
     955             :     }
     956             : 
     957     4731786 :     return false;
     958             : }
     959             : 
     960             : /*
     961             :  * A wrapper for XLogBeginRead() that also resets the prefetcher.
     962             :  */
     963             : void
     964        3266 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
     965             : {
     966             :     /* This will forget about any in-flight IO. */
     967        3266 :     prefetcher->reconfigure_count--;
     968             : 
     969             :     /* Book-keeping to avoid readahead on first read. */
     970        3266 :     prefetcher->begin_ptr = recPtr;
     971             : 
     972        3266 :     prefetcher->no_readahead_until = 0;
     973             : 
     974             :     /* This will forget about any queued up records in the decoder. */
     975        3266 :     XLogBeginRead(prefetcher->reader, recPtr);
     976        3266 : }
     977             : 
     978             : /*
     979             :  * A wrapper for XLogReadRecord() that provides the same interface, but also
     980             :  * tries to initiate I/O for blocks referenced in future WAL records.
     981             :  */
     982             : XLogRecord *
     983     5275640 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
     984             : {
     985             :     DecodedXLogRecord *record;
     986             :     XLogRecPtr  replayed_up_to;
     987             : 
     988             :     /*
     989             :      * See if it's time to reset the prefetching machinery, because a relevant
     990             :      * GUC was changed.
     991             :      */
     992     5275640 :     if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
     993             :     {
     994             :         uint32      max_distance;
     995             :         uint32      max_inflight;
     996             : 
     997        3286 :         if (prefetcher->streaming_read)
     998        1766 :             lrq_free(prefetcher->streaming_read);
     999             : 
    1000        3286 :         if (RecoveryPrefetchEnabled())
    1001             :         {
    1002             :             Assert(maintenance_io_concurrency > 0);
    1003        3286 :             max_inflight = maintenance_io_concurrency;
    1004        3286 :             max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
    1005             :         }
    1006             :         else
    1007             :         {
    1008           0 :             max_inflight = 1;
    1009           0 :             max_distance = 1;
    1010             :         }
    1011             : 
    1012        3286 :         prefetcher->streaming_read = lrq_alloc(max_distance,
    1013             :                                                max_inflight,
    1014             :                                                (uintptr_t) prefetcher,
    1015             :                                                XLogPrefetcherNextBlock);
    1016             : 
    1017        3286 :         prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
    1018             :     }
    1019             : 
    1020             :     /*
    1021             :      * Release last returned record, if there is one, as it's now been
    1022             :      * replayed.
    1023             :      */
    1024     5275640 :     replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
    1025             : 
    1026             :     /*
    1027             :      * Can we drop any filters yet?  If we were waiting for a relation to be
    1028             :      * created or extended, it is now OK to access blocks in the covered
    1029             :      * range.
    1030             :      */
    1031     5275640 :     XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
    1032             : 
    1033             :     /*
    1034             :      * All IO initiated by earlier WAL is now completed.  This might trigger
    1035             :      * further prefetching.
    1036             :      */
    1037     5275640 :     lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
    1038             : 
    1039             :     /*
    1040             :      * If there's nothing queued yet, then start prefetching to cause at least
    1041             :      * one record to be queued.
    1042             :      */
    1043     5275546 :     if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
    1044             :     {
    1045             :         Assert(lrq_inflight(prefetcher->streaming_read) == 0);
    1046             :         Assert(lrq_completed(prefetcher->streaming_read) == 0);
    1047          30 :         lrq_prefetch(prefetcher->streaming_read);
    1048             :     }
    1049             : 
    1050             :     /* Read the next record. */
    1051     5275546 :     record = XLogNextRecord(prefetcher->reader, errmsg);
    1052     5275546 :     if (!record)
    1053         452 :         return NULL;
    1054             : 
    1055             :     /*
    1056             :      * The record we just got is the "current" one, for the benefit of the
    1057             :      * XLogRecXXX() macros.
    1058             :      */
    1059             :     Assert(record == prefetcher->reader->record);
    1060             : 
    1061             :     /*
    1062             :      * If maintenance_io_concurrency is set very low, we might have started
    1063             :      * prefetching some but not all of the blocks referenced in the record
    1064             :      * we're about to return.  Forget about the rest of the blocks in this
    1065             :      * record by dropping the prefetcher's reference to it.
    1066             :      */
    1067     5275094 :     if (record == prefetcher->record)
    1068        3266 :         prefetcher->record = NULL;
    1069             : 
    1070             :     /*
    1071             :      * See if it's time to compute some statistics, because enough WAL has
    1072             :      * been processed.
    1073             :      */
    1074     5275094 :     if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
    1075       87492 :         XLogPrefetcherComputeStats(prefetcher);
    1076             : 
    1077             :     Assert(record == prefetcher->reader->record);
    1078             : 
    1079     5275094 :     return &record->header;
    1080             : }
    1081             : 
    1082             : bool
    1083        1830 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
    1084             : {
    1085             : #ifndef USE_PREFETCH
    1086             :     if (*new_value == RECOVERY_PREFETCH_ON)
    1087             :     {
    1088             :         GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
    1089             :         return false;
    1090             :     }
    1091             : #endif
    1092             : 
    1093        1830 :     return true;
    1094             : }
    1095             : 
    1096             : void
    1097        1830 : assign_recovery_prefetch(int new_value, void *extra)
    1098             : {
    1099             :     /* Reconfigure prefetching, because a setting it depends on changed. */
    1100        1830 :     recovery_prefetch = new_value;
    1101        1830 :     if (AmStartupProcess())
    1102           0 :         XLogPrefetchReconfigure();
    1103        1830 : }

Generated by: LCOV version 1.14