LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogprefetcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 278 285 97.5 %
Date: 2025-01-18 05:15:39 Functions: 24 24 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * xlogprefetcher.c
       4             :  *      Prefetching support for recovery.
       5             :  *
       6             :  * Portions Copyright (c) 2022-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/backend/access/transam/xlogprefetcher.c
      12             :  *
      13             :  * This module provides a drop-in replacement for an XLogReader that tries to
      14             :  * minimize I/O stalls by looking ahead in the WAL.  If blocks that will be
      15             :  * accessed in the near future are not already in the buffer pool, it initiates
      16             :  * I/Os that might complete before the caller eventually needs the data.  When
      17             :  * referenced blocks are found in the buffer pool already, the buffer is
      18             :  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
      19             :  * avoid a second buffer mapping table lookup.
      20             :  *
      21             :  * Currently, only the main fork is considered for prefetching.  Currently,
      22             :  * prefetching is only effective on systems where PrefetchBuffer() does
      23             :  * something useful (mainly Linux).
      24             :  *
      25             :  *-------------------------------------------------------------------------
      26             :  */
      27             : 
      28             : #include "postgres.h"
      29             : 
      30             : #include "access/xlogprefetcher.h"
      31             : #include "access/xlogreader.h"
      32             : #include "catalog/pg_control.h"
      33             : #include "catalog/storage_xlog.h"
      34             : #include "commands/dbcommands_xlog.h"
      35             : #include "funcapi.h"
      36             : #include "miscadmin.h"
      37             : #include "port/atomics.h"
      38             : #include "storage/bufmgr.h"
      39             : #include "storage/shmem.h"
      40             : #include "storage/smgr.h"
      41             : #include "utils/fmgrprotos.h"
      42             : #include "utils/guc_hooks.h"
      43             : #include "utils/hsearch.h"
      44             : #include "utils/timestamp.h"
      45             : 
      46             : /*
      47             :  * Every time we process this much WAL, we'll update the values in
      48             :  * pg_stat_recovery_prefetch.
      49             :  */
      50             : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
      51             : 
      52             : /*
      53             :  * To detect repeated access to the same block and skip useless extra system
      54             :  * calls, we remember a small window of recently prefetched blocks.
      55             :  */
      56             : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
      57             : 
      58             : /*
      59             :  * When maintenance_io_concurrency is not saturated, we're prepared to look
      60             :  * ahead up to N times that number of block references.
      61             :  */
      62             : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
      63             : 
      64             : /* Define to log internal debugging messages. */
      65             : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
      66             : 
      67             : /* GUCs */
      68             : int         recovery_prefetch = RECOVERY_PREFETCH_TRY;
      69             : 
      70             : #ifdef USE_PREFETCH
      71             : #define RecoveryPrefetchEnabled() \
      72             :         (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
      73             :          maintenance_io_concurrency > 0)
      74             : #else
      75             : #define RecoveryPrefetchEnabled() false
      76             : #endif
      77             : 
      78             : static int  XLogPrefetchReconfigureCount = 0;
      79             : 
      80             : /*
      81             :  * Enum used to report whether an IO should be started.
      82             :  */
      83             : typedef enum
      84             : {
      85             :     LRQ_NEXT_NO_IO,
      86             :     LRQ_NEXT_IO,
      87             :     LRQ_NEXT_AGAIN,
      88             : } LsnReadQueueNextStatus;
      89             : 
      90             : /*
      91             :  * Type of callback that can decide which block to prefetch next.  For now
      92             :  * there is only one.
      93             :  */
      94             : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
      95             :                                                        XLogRecPtr *lsn);
      96             : 
      97             : /*
      98             :  * A simple circular queue of LSNs, using to control the number of
      99             :  * (potentially) inflight IOs.  This stands in for a later more general IO
     100             :  * control mechanism, which is why it has the apparently unnecessary
     101             :  * indirection through a function pointer.
     102             :  */
     103             : typedef struct LsnReadQueue
     104             : {
     105             :     LsnReadQueueNextFun next;
     106             :     uintptr_t   lrq_private;
     107             :     uint32      max_inflight;
     108             :     uint32      inflight;
     109             :     uint32      completed;
     110             :     uint32      head;
     111             :     uint32      tail;
     112             :     uint32      size;
     113             :     struct
     114             :     {
     115             :         bool        io;
     116             :         XLogRecPtr  lsn;
     117             :     }           queue[FLEXIBLE_ARRAY_MEMBER];
     118             : } LsnReadQueue;
     119             : 
     120             : /*
     121             :  * A prefetcher.  This is a mechanism that wraps an XLogReader, prefetching
     122             :  * blocks that will be soon be referenced, to try to avoid IO stalls.
     123             :  */
     124             : struct XLogPrefetcher
     125             : {
     126             :     /* WAL reader and current reading state. */
     127             :     XLogReaderState *reader;
     128             :     DecodedXLogRecord *record;
     129             :     int         next_block_id;
     130             : 
     131             :     /* When to publish stats. */
     132             :     XLogRecPtr  next_stats_shm_lsn;
     133             : 
     134             :     /* Book-keeping to avoid accessing blocks that don't exist yet. */
     135             :     HTAB       *filter_table;
     136             :     dlist_head  filter_queue;
     137             : 
     138             :     /* Book-keeping to avoid repeat prefetches. */
     139             :     RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     140             :     BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     141             :     int         recent_idx;
     142             : 
     143             :     /* Book-keeping to disable prefetching temporarily. */
     144             :     XLogRecPtr  no_readahead_until;
     145             : 
     146             :     /* IO depth manager. */
     147             :     LsnReadQueue *streaming_read;
     148             : 
     149             :     XLogRecPtr  begin_ptr;
     150             : 
     151             :     int         reconfigure_count;
     152             : };
     153             : 
     154             : /*
     155             :  * A temporary filter used to track block ranges that haven't been created
     156             :  * yet, whole relations that haven't been created yet, and whole relations
     157             :  * that (we assume) have already been dropped, or will be created by bulk WAL
     158             :  * operators.
     159             :  */
     160             : typedef struct XLogPrefetcherFilter
     161             : {
     162             :     RelFileLocator rlocator;
     163             :     XLogRecPtr  filter_until_replayed;
     164             :     BlockNumber filter_from_block;
     165             :     dlist_node  link;
     166             : } XLogPrefetcherFilter;
     167             : 
     168             : /*
     169             :  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
     170             :  */
     171             : typedef struct XLogPrefetchStats
     172             : {
     173             :     pg_atomic_uint64 reset_time;    /* Time of last reset. */
     174             :     pg_atomic_uint64 prefetch;  /* Prefetches initiated. */
     175             :     pg_atomic_uint64 hit;       /* Blocks already in cache. */
     176             :     pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
     177             :     pg_atomic_uint64 skip_new;  /* New/missing blocks filtered. */
     178             :     pg_atomic_uint64 skip_fpw;  /* FPWs skipped. */
     179             :     pg_atomic_uint64 skip_rep;  /* Repeat accesses skipped. */
     180             : 
     181             :     /* Dynamic values */
     182             :     int         wal_distance;   /* Number of WAL bytes ahead. */
     183             :     int         block_distance; /* Number of block references ahead. */
     184             :     int         io_depth;       /* Number of I/Os in progress. */
     185             : } XLogPrefetchStats;
     186             : 
     187             : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
     188             :                                            RelFileLocator rlocator,
     189             :                                            BlockNumber blockno,
     190             :                                            XLogRecPtr lsn);
     191             : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
     192             :                                             RelFileLocator rlocator,
     193             :                                             BlockNumber blockno);
     194             : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
     195             :                                                  XLogRecPtr replaying_lsn);
     196             : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
     197             :                                                       XLogRecPtr *lsn);
     198             : 
     199             : static XLogPrefetchStats *SharedStats;
     200             : 
     201             : static inline LsnReadQueue *
     202        3564 : lrq_alloc(uint32 max_distance,
     203             :           uint32 max_inflight,
     204             :           uintptr_t lrq_private,
     205             :           LsnReadQueueNextFun next)
     206             : {
     207             :     LsnReadQueue *lrq;
     208             :     uint32      size;
     209             : 
     210             :     Assert(max_distance >= max_inflight);
     211             : 
     212        3564 :     size = max_distance + 1;    /* full ring buffer has a gap */
     213        3564 :     lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
     214        3564 :     lrq->lrq_private = lrq_private;
     215        3564 :     lrq->max_inflight = max_inflight;
     216        3564 :     lrq->size = size;
     217        3564 :     lrq->next = next;
     218        3564 :     lrq->head = 0;
     219        3564 :     lrq->tail = 0;
     220        3564 :     lrq->inflight = 0;
     221        3564 :     lrq->completed = 0;
     222             : 
     223        3564 :     return lrq;
     224             : }
     225             : 
     226             : static inline void
     227        3458 : lrq_free(LsnReadQueue *lrq)
     228             : {
     229        3458 :     pfree(lrq);
     230        3458 : }
     231             : 
     232             : static inline uint32
     233     1993372 : lrq_inflight(LsnReadQueue *lrq)
     234             : {
     235     1993372 :     return lrq->inflight;
     236             : }
     237             : 
     238             : static inline uint32
     239     1993372 : lrq_completed(LsnReadQueue *lrq)
     240             : {
     241     1993372 :     return lrq->completed;
     242             : }
     243             : 
     244             : static inline void
     245     5323440 : lrq_prefetch(LsnReadQueue *lrq)
     246             : {
     247             :     /* Try to start as many IOs as we can within our limits. */
     248    10943236 :     while (lrq->inflight < lrq->max_inflight &&
     249    10938856 :            lrq->inflight + lrq->completed < lrq->size - 1)
     250             :     {
     251             :         Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
     252     5821992 :         switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
     253             :         {
     254      202096 :             case LRQ_NEXT_AGAIN:
     255      202096 :                 return;
     256       12160 :             case LRQ_NEXT_IO:
     257       12160 :                 lrq->queue[lrq->head].io = true;
     258       12160 :                 lrq->inflight++;
     259       12160 :                 break;
     260     5607636 :             case LRQ_NEXT_NO_IO:
     261     5607636 :                 lrq->queue[lrq->head].io = false;
     262     5607636 :                 lrq->completed++;
     263     5607636 :                 break;
     264             :         }
     265     5619796 :         lrq->head++;
     266     5619796 :         if (lrq->head == lrq->size)
     267      136962 :             lrq->head = 0;
     268             :     }
     269             : }
     270             : 
     271             : static inline void
     272     5323406 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
     273             : {
     274             :     /*
     275             :      * We know that LSNs before 'lsn' have been replayed, so we can now assume
     276             :      * that any IOs that were started before then have finished.
     277             :      */
     278    10943088 :     while (lrq->tail != lrq->head &&
     279    10921298 :            lrq->queue[lrq->tail].lsn < lsn)
     280             :     {
     281     5619682 :         if (lrq->queue[lrq->tail].io)
     282       12160 :             lrq->inflight--;
     283             :         else
     284     5607522 :             lrq->completed--;
     285     5619682 :         lrq->tail++;
     286     5619682 :         if (lrq->tail == lrq->size)
     287      136960 :             lrq->tail = 0;
     288             :     }
     289     5323406 :     if (RecoveryPrefetchEnabled())
     290     5323406 :         lrq_prefetch(lrq);
     291     5323306 : }
     292             : 
     293             : size_t
     294        3566 : XLogPrefetchShmemSize(void)
     295             : {
     296        3566 :     return sizeof(XLogPrefetchStats);
     297             : }
     298             : 
     299             : /*
     300             :  * Reset all counters to zero.
     301             :  */
     302             : void
     303           6 : XLogPrefetchResetStats(void)
     304             : {
     305           6 :     pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     306           6 :     pg_atomic_write_u64(&SharedStats->prefetch, 0);
     307           6 :     pg_atomic_write_u64(&SharedStats->hit, 0);
     308           6 :     pg_atomic_write_u64(&SharedStats->skip_init, 0);
     309           6 :     pg_atomic_write_u64(&SharedStats->skip_new, 0);
     310           6 :     pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
     311           6 :     pg_atomic_write_u64(&SharedStats->skip_rep, 0);
     312           6 : }
     313             : 
     314             : void
     315        1918 : XLogPrefetchShmemInit(void)
     316             : {
     317             :     bool        found;
     318             : 
     319        1918 :     SharedStats = (XLogPrefetchStats *)
     320        1918 :         ShmemInitStruct("XLogPrefetchStats",
     321             :                         sizeof(XLogPrefetchStats),
     322             :                         &found);
     323             : 
     324        1918 :     if (!found)
     325             :     {
     326        1918 :         pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     327        1918 :         pg_atomic_init_u64(&SharedStats->prefetch, 0);
     328        1918 :         pg_atomic_init_u64(&SharedStats->hit, 0);
     329        1918 :         pg_atomic_init_u64(&SharedStats->skip_init, 0);
     330        1918 :         pg_atomic_init_u64(&SharedStats->skip_new, 0);
     331        1918 :         pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
     332        1918 :         pg_atomic_init_u64(&SharedStats->skip_rep, 0);
     333             :     }
     334        1918 : }
     335             : 
     336             : /*
     337             :  * Called when any GUC is changed that affects prefetching.
     338             :  */
     339             : void
     340          22 : XLogPrefetchReconfigure(void)
     341             : {
     342          22 :     XLogPrefetchReconfigureCount++;
     343          22 : }
     344             : 
     345             : /*
     346             :  * Increment a counter in shared memory.  This is equivalent to *counter++ on a
     347             :  * plain uint64 without any memory barrier or locking, except on platforms
     348             :  * where readers can't read uint64 without possibly observing a torn value.
     349             :  */
     350             : static inline void
     351     5601678 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
     352             : {
     353             :     Assert(AmStartupProcess() || !IsUnderPostmaster);
     354     5601678 :     pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
     355     5601678 : }
     356             : 
     357             : /*
     358             :  * Create a prefetcher that is ready to begin prefetching blocks referenced by
     359             :  * WAL records.
     360             :  */
     361             : XLogPrefetcher *
     362        1650 : XLogPrefetcherAllocate(XLogReaderState *reader)
     363             : {
     364             :     XLogPrefetcher *prefetcher;
     365             :     HASHCTL     ctl;
     366             : 
     367        1650 :     prefetcher = palloc0(sizeof(XLogPrefetcher));
     368        1650 :     prefetcher->reader = reader;
     369             : 
     370        1650 :     ctl.keysize = sizeof(RelFileLocator);
     371        1650 :     ctl.entrysize = sizeof(XLogPrefetcherFilter);
     372        1650 :     prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
     373             :                                            &ctl, HASH_ELEM | HASH_BLOBS);
     374        1650 :     dlist_init(&prefetcher->filter_queue);
     375             : 
     376        1650 :     SharedStats->wal_distance = 0;
     377        1650 :     SharedStats->block_distance = 0;
     378        1650 :     SharedStats->io_depth = 0;
     379             : 
     380             :     /* First usage will cause streaming_read to be allocated. */
     381        1650 :     prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
     382             : 
     383        1650 :     return prefetcher;
     384             : }
     385             : 
     386             : /*
     387             :  * Destroy a prefetcher and release all resources.
     388             :  */
     389             : void
     390        1544 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
     391             : {
     392        1544 :     lrq_free(prefetcher->streaming_read);
     393        1544 :     hash_destroy(prefetcher->filter_table);
     394        1544 :     pfree(prefetcher);
     395        1544 : }
     396             : 
     397             : /*
     398             :  * Provide access to the reader.
     399             :  */
     400             : XLogReaderState *
     401     5323226 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
     402             : {
     403     5323226 :     return prefetcher->reader;
     404             : }
     405             : 
     406             : /*
     407             :  * Update the statistics visible in the pg_stat_recovery_prefetch view.
     408             :  */
     409             : void
     410     1993372 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
     411             : {
     412             :     uint32      io_depth;
     413             :     uint32      completed;
     414             :     int64       wal_distance;
     415             : 
     416             : 
     417             :     /* How far ahead of replay are we now? */
     418     1993372 :     if (prefetcher->reader->decode_queue_tail)
     419             :     {
     420     1986796 :         wal_distance =
     421     1986796 :             prefetcher->reader->decode_queue_tail->lsn -
     422     1986796 :             prefetcher->reader->decode_queue_head->lsn;
     423             :     }
     424             :     else
     425             :     {
     426        6576 :         wal_distance = 0;
     427             :     }
     428             : 
     429             :     /* How many IOs are currently in flight and completed? */
     430     1993372 :     io_depth = lrq_inflight(prefetcher->streaming_read);
     431     1993372 :     completed = lrq_completed(prefetcher->streaming_read);
     432             : 
     433             :     /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
     434     1993372 :     SharedStats->io_depth = io_depth;
     435     1993372 :     SharedStats->block_distance = io_depth + completed;
     436     1993372 :     SharedStats->wal_distance = wal_distance;
     437             : 
     438     1993372 :     prefetcher->next_stats_shm_lsn =
     439     1993372 :         prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
     440     1993372 : }
     441             : 
     442             : /*
     443             :  * A callback that examines the next block reference in the WAL, and possibly
     444             :  * starts an IO so that a later read will be fast.
     445             :  *
     446             :  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
     447             :  *
     448             :  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
     449             :  * that isn't in the buffer pool, and the kernel has been asked to start
     450             :  * reading it to make a future read system call faster. An LSN is written to
     451             :  * *lsn, and the I/O will be considered to have completed once that LSN is
     452             :  * replayed.
     453             :  *
     454             :  * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
     455             :  * that it was already in the buffer pool, or we decided for various reasons
     456             :  * not to prefetch.
     457             :  */
     458             : static LsnReadQueueNextStatus
     459     5821992 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
     460             : {
     461     5821992 :     XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
     462     5821992 :     XLogReaderState *reader = prefetcher->reader;
     463     5821992 :     XLogRecPtr  replaying_lsn = reader->ReadRecPtr;
     464             : 
     465             :     /*
     466             :      * We keep track of the record and block we're up to between calls with
     467             :      * prefetcher->record and prefetcher->next_block_id.
     468             :      */
     469             :     for (;;)
     470     5319382 :     {
     471             :         DecodedXLogRecord *record;
     472             : 
     473             :         /* Try to read a new future record, if we don't already have one. */
     474    11141374 :         if (prefetcher->record == NULL)
     475             :         {
     476             :             bool        nonblocking;
     477             : 
     478             :             /*
     479             :              * If there are already records or an error queued up that could
     480             :              * be replayed, we don't want to block here.  Otherwise, it's OK
     481             :              * to block waiting for more data: presumably the caller has
     482             :              * nothing else to do.
     483             :              */
     484     5521578 :             nonblocking = XLogReaderHasQueuedRecordOrError(reader);
     485             : 
     486             :             /* Readahead is disabled until we replay past a certain point. */
     487     5521578 :             if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
     488      191466 :                 return LRQ_NEXT_AGAIN;
     489             : 
     490     5330112 :             record = XLogReadAhead(prefetcher->reader, nonblocking);
     491     5330012 :             if (record == NULL)
     492             :             {
     493             :                 /*
     494             :                  * We can't read any more, due to an error or lack of data in
     495             :                  * nonblocking mode.  Don't try to read ahead again until
     496             :                  * we've replayed everything already decoded.
     497             :                  */
     498        7088 :                 if (nonblocking && prefetcher->reader->decode_queue_tail)
     499        6786 :                     prefetcher->no_readahead_until =
     500        6786 :                         prefetcher->reader->decode_queue_tail->lsn;
     501             : 
     502        7088 :                 return LRQ_NEXT_AGAIN;
     503             :             }
     504             : 
     505             :             /*
     506             :              * If prefetching is disabled, we don't need to analyze the record
     507             :              * or issue any prefetches.  We just need to cause one record to
     508             :              * be decoded.
     509             :              */
     510     5322924 :             if (!RecoveryPrefetchEnabled())
     511             :             {
     512           0 :                 *lsn = InvalidXLogRecPtr;
     513           0 :                 return LRQ_NEXT_NO_IO;
     514             :             }
     515             : 
     516             :             /* We have a new record to process. */
     517     5322924 :             prefetcher->record = record;
     518     5322924 :             prefetcher->next_block_id = 0;
     519             :         }
     520             :         else
     521             :         {
     522             :             /* Continue to process from last call, or last loop. */
     523     5619796 :             record = prefetcher->record;
     524             :         }
     525             : 
     526             :         /*
     527             :          * Check for operations that require us to filter out block ranges, or
     528             :          * pause readahead completely.
     529             :          */
     530    10942720 :         if (replaying_lsn < record->lsn)
     531             :         {
     532    10942720 :             uint8       rmid = record->header.xl_rmid;
     533    10942720 :             uint8       record_type = record->header.xl_info & ~XLR_INFO_MASK;
     534             : 
     535    10942720 :             if (rmid == RM_XLOG_ID)
     536             :             {
     537      168990 :                 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
     538             :                     record_type == XLOG_END_OF_RECOVERY)
     539             :                 {
     540             :                     /*
     541             :                      * These records might change the TLI.  Avoid potential
     542             :                      * bugs if we were to allow "read TLI" and "replay TLI" to
     543             :                      * differ without more analysis.
     544             :                      */
     545        2802 :                     prefetcher->no_readahead_until = record->lsn;
     546             : 
     547             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     548             :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     549             :                          "suppressing all readahead until %X/%X is replayed due to possible TLI change",
     550             :                          LSN_FORMAT_ARGS(record->lsn));
     551             : #endif
     552             : 
     553             :                     /* Fall through so we move past this record. */
     554             :                 }
     555             :             }
     556    10773730 :             else if (rmid == RM_DBASE_ID)
     557             :             {
     558             :                 /*
     559             :                  * When databases are created with the file-copy strategy,
     560             :                  * there are no WAL records to tell us about the creation of
     561             :                  * individual relations.
     562             :                  */
     563          78 :                 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
     564             :                 {
     565           8 :                     xl_dbase_create_file_copy_rec *xlrec =
     566             :                         (xl_dbase_create_file_copy_rec *) record->main_data;
     567           8 :                     RelFileLocator rlocator =
     568           8 :                     {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
     569             : 
     570             :                     /*
     571             :                      * Don't try to prefetch anything in this database until
     572             :                      * it has been created, or we might confuse the blocks of
     573             :                      * different generations, if a database OID or
     574             :                      * relfilenumber is reused.  It's also more efficient than
     575             :                      * discovering that relations don't exist on disk yet with
     576             :                      * ENOENT errors.
     577             :                      */
     578           8 :                     XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
     579             : 
     580             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     581             :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     582             :                          "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
     583             :                          rlocator.dbOid,
     584             :                          LSN_FORMAT_ARGS(record->lsn));
     585             : #endif
     586             :                 }
     587             :             }
     588    10773652 :             else if (rmid == RM_SMGR_ID)
     589             :             {
     590       31026 :                 if (record_type == XLOG_SMGR_CREATE)
     591             :                 {
     592       30932 :                     xl_smgr_create *xlrec = (xl_smgr_create *)
     593             :                         record->main_data;
     594             : 
     595       30932 :                     if (xlrec->forkNum == MAIN_FORKNUM)
     596             :                     {
     597             :                         /*
     598             :                          * Don't prefetch anything for this whole relation
     599             :                          * until it has been created.  Otherwise we might
     600             :                          * confuse the blocks of different generations, if a
     601             :                          * relfilenumber is reused.  This also avoids the need
     602             :                          * to discover the problem via extra syscalls that
     603             :                          * report ENOENT.
     604             :                          */
     605       27500 :                         XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
     606             :                                                 record->lsn);
     607             : 
     608             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     609             :                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     610             :                              "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
     611             :                              xlrec->rlocator.spcOid,
     612             :                              xlrec->rlocator.dbOid,
     613             :                              xlrec->rlocator.relNumber,
     614             :                              LSN_FORMAT_ARGS(record->lsn));
     615             : #endif
     616             :                     }
     617             :                 }
     618          94 :                 else if (record_type == XLOG_SMGR_TRUNCATE)
     619             :                 {
     620          94 :                     xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
     621             :                         record->main_data;
     622             : 
     623             :                     /*
     624             :                      * Don't consider prefetching anything in the truncated
     625             :                      * range until the truncation has been performed.
     626             :                      */
     627          94 :                     XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
     628             :                                             xlrec->blkno,
     629             :                                             record->lsn);
     630             : 
     631             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     632             :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     633             :                          "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
     634             :                          xlrec->rlocator.spcOid,
     635             :                          xlrec->rlocator.dbOid,
     636             :                          xlrec->rlocator.relNumber,
     637             :                          xlrec->blkno,
     638             :                          LSN_FORMAT_ARGS(record->lsn));
     639             : #endif
     640             :                 }
     641             :             }
     642             :         }
     643             : 
     644             :         /* Scan the block references, starting where we left off last time. */
     645    10947342 :         while (prefetcher->next_block_id <= record->max_block_id)
     646             :         {
     647     5624418 :             int         block_id = prefetcher->next_block_id++;
     648     5624418 :             DecodedBkpBlock *block = &record->blocks[block_id];
     649             :             SMgrRelation reln;
     650             :             PrefetchBufferResult result;
     651             : 
     652     5624418 :             if (!block->in_use)
     653        4180 :                 continue;
     654             : 
     655             :             Assert(!BufferIsValid(block->prefetch_buffer));
     656             : 
     657             :             /*
     658             :              * Record the LSN of this record.  When it's replayed,
     659             :              * LsnReadQueue will consider any IOs submitted for earlier LSNs
     660             :              * to be finished.
     661             :              */
     662     5620238 :             *lsn = record->lsn;
     663             : 
     664             :             /* We don't try to prefetch anything but the main fork for now. */
     665     5620238 :             if (block->forknum != MAIN_FORKNUM)
     666             :             {
     667     5619796 :                 return LRQ_NEXT_NO_IO;
     668             :             }
     669             : 
     670             :             /*
     671             :              * If there is a full page image attached, we won't be reading the
     672             :              * page, so don't bother trying to prefetch.
     673             :              */
     674     5602120 :             if (block->has_image)
     675             :             {
     676     4575420 :                 XLogPrefetchIncrement(&SharedStats->skip_fpw);
     677     4575420 :                 return LRQ_NEXT_NO_IO;
     678             :             }
     679             : 
     680             :             /* There is no point in reading a page that will be zeroed. */
     681     1026700 :             if (block->flags & BKPBLOCK_WILL_INIT)
     682             :             {
     683       12720 :                 XLogPrefetchIncrement(&SharedStats->skip_init);
     684       12720 :                 return LRQ_NEXT_NO_IO;
     685             :             }
     686             : 
     687             :             /* Should we skip prefetching this block due to a filter? */
     688     1013980 :             if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
     689             :             {
     690       90664 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     691       90664 :                 return LRQ_NEXT_NO_IO;
     692             :             }
     693             : 
     694             :             /* There is no point in repeatedly prefetching the same block. */
     695     2359512 :             for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
     696             :             {
     697     2334564 :                 if (block->blkno == prefetcher->recent_block[i] &&
     698      930540 :                     RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
     699             :                 {
     700             :                     /*
     701             :                      * XXX If we also remembered where it was, we could set
     702             :                      * recent_buffer so that recovery could skip smgropen()
     703             :                      * and a buffer table lookup.
     704             :                      */
     705      898368 :                     XLogPrefetchIncrement(&SharedStats->skip_rep);
     706      898368 :                     return LRQ_NEXT_NO_IO;
     707             :                 }
     708             :             }
     709       24948 :             prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
     710       24948 :             prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
     711       24948 :             prefetcher->recent_idx =
     712       24948 :                 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
     713             : 
     714             :             /*
     715             :              * We could try to have a fast path for repeated references to the
     716             :              * same relation (with some scheme to handle invalidations
     717             :              * safely), but for now we'll call smgropen() every time.
     718             :              */
     719       24948 :             reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
     720             : 
     721             :             /*
     722             :              * If the relation file doesn't exist on disk, for example because
     723             :              * we're replaying after a crash and the file will be created and
     724             :              * then unlinked by WAL that hasn't been replayed yet, suppress
     725             :              * further prefetching in the relation until this record is
     726             :              * replayed.
     727             :              */
     728       24948 :             if (!smgrexists(reln, MAIN_FORKNUM))
     729             :             {
     730             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     731             :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     732             :                      "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
     733             :                      reln->smgr_rlocator.locator.spcOid,
     734             :                      reln->smgr_rlocator.locator.dbOid,
     735             :                      reln->smgr_rlocator.locator.relNumber,
     736             :                      LSN_FORMAT_ARGS(record->lsn));
     737             : #endif
     738          12 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
     739             :                                         record->lsn);
     740          12 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     741          12 :                 return LRQ_NEXT_NO_IO;
     742             :             }
     743             : 
     744             :             /*
     745             :              * If the relation isn't big enough to contain the referenced
     746             :              * block yet, suppress prefetching of this block and higher until
     747             :              * this record is replayed.
     748             :              */
     749       24936 :             if (block->blkno >= smgrnblocks(reln, block->forknum))
     750             :             {
     751             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     752             :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     753             :                      "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
     754             :                      reln->smgr_rlocator.locator.spcOid,
     755             :                      reln->smgr_rlocator.locator.dbOid,
     756             :                      reln->smgr_rlocator.locator.relNumber,
     757             :                      block->blkno,
     758             :                      LSN_FORMAT_ARGS(record->lsn));
     759             : #endif
     760        2934 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
     761             :                                         record->lsn);
     762        2934 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     763        2934 :                 return LRQ_NEXT_NO_IO;
     764             :             }
     765             : 
     766             :             /* Try to initiate prefetching. */
     767       22002 :             result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
     768       22002 :             if (BufferIsValid(result.recent_buffer))
     769             :             {
     770             :                 /* Cache hit, nothing to do. */
     771        9400 :                 XLogPrefetchIncrement(&SharedStats->hit);
     772        9400 :                 block->prefetch_buffer = result.recent_buffer;
     773        9400 :                 return LRQ_NEXT_NO_IO;
     774             :             }
     775       12602 :             else if (result.initiated_io)
     776             :             {
     777             :                 /* Cache miss, I/O (presumably) started. */
     778       12160 :                 XLogPrefetchIncrement(&SharedStats->prefetch);
     779       12160 :                 block->prefetch_buffer = InvalidBuffer;
     780       12160 :                 return LRQ_NEXT_IO;
     781             :             }
     782         442 :             else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
     783             :             {
     784             :                 /*
     785             :                  * This shouldn't be possible, because we already determined
     786             :                  * that the relation exists on disk and is big enough.
     787             :                  * Something is wrong with the cache invalidation for
     788             :                  * smgrexists(), smgrnblocks(), or the file was unlinked or
     789             :                  * truncated beneath our feet?
     790             :                  */
     791           0 :                 elog(ERROR,
     792             :                      "could not prefetch relation %u/%u/%u block %u",
     793             :                      reln->smgr_rlocator.locator.spcOid,
     794             :                      reln->smgr_rlocator.locator.dbOid,
     795             :                      reln->smgr_rlocator.locator.relNumber,
     796             :                      block->blkno);
     797             :             }
     798             :         }
     799             : 
     800             :         /*
     801             :          * Several callsites need to be able to read exactly one record
     802             :          * without any internal readahead.  Examples: xlog.c reading
     803             :          * checkpoint records with emode set to PANIC, which might otherwise
     804             :          * cause XLogPageRead() to panic on some future page, and xlog.c
     805             :          * determining where to start writing WAL next, which depends on the
     806             :          * contents of the reader's internal buffer after reading one record.
     807             :          * Therefore, don't even think about prefetching until the first
     808             :          * record after XLogPrefetcherBeginRead() has been consumed.
     809             :          */
     810     5322924 :         if (prefetcher->reader->decode_queue_tail &&
     811     5322922 :             prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
     812        3542 :             return LRQ_NEXT_AGAIN;
     813             : 
     814             :         /* Advance to the next record. */
     815     5319382 :         prefetcher->record = NULL;
     816             :     }
     817             :     pg_unreachable();
     818             : }
     819             : 
     820             : /*
     821             :  * Expose statistics about recovery prefetching.
     822             :  */
     823             : Datum
     824          12 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
     825             : {
     826             : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
     827          12 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     828             :     Datum       values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     829             :     bool        nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     830             : 
     831          12 :     InitMaterializedSRF(fcinfo, 0);
     832             : 
     833         132 :     for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
     834         120 :         nulls[i] = false;
     835             : 
     836          12 :     values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
     837          12 :     values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
     838          12 :     values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
     839          12 :     values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
     840          12 :     values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
     841          12 :     values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
     842          12 :     values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
     843          12 :     values[7] = Int32GetDatum(SharedStats->wal_distance);
     844          12 :     values[8] = Int32GetDatum(SharedStats->block_distance);
     845          12 :     values[9] = Int32GetDatum(SharedStats->io_depth);
     846          12 :     tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
     847             : 
     848          12 :     return (Datum) 0;
     849             : }
     850             : 
     851             : /*
     852             :  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
     853             :  * has been replayed.
     854             :  */
     855             : static inline void
     856       30548 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     857             :                         BlockNumber blockno, XLogRecPtr lsn)
     858             : {
     859             :     XLogPrefetcherFilter *filter;
     860             :     bool        found;
     861             : 
     862       30548 :     filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
     863       30548 :     if (!found)
     864             :     {
     865             :         /*
     866             :          * Don't allow any prefetching of this block or higher until replayed.
     867             :          */
     868       30540 :         filter->filter_until_replayed = lsn;
     869       30540 :         filter->filter_from_block = blockno;
     870       30540 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     871             :     }
     872             :     else
     873             :     {
     874             :         /*
     875             :          * We were already filtering this rlocator.  Extend the filter's
     876             :          * lifetime to cover this WAL record, but leave the lower of the block
     877             :          * numbers there because we don't want to have to track individual
     878             :          * blocks.
     879             :          */
     880           8 :         filter->filter_until_replayed = lsn;
     881           8 :         dlist_delete(&filter->link);
     882           8 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     883           8 :         filter->filter_from_block = Min(filter->filter_from_block, blockno);
     884             :     }
     885       30548 : }
     886             : 
     887             : /*
     888             :  * Have we replayed any records that caused us to begin filtering a block
     889             :  * range?  That means that relations should have been created, extended or
     890             :  * dropped as required, so we can stop filtering out accesses to a given
     891             :  * relfilenumber.
     892             :  */
     893             : static inline void
     894     5323406 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
     895             : {
     896     5353938 :     while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     897             :     {
     898      734958 :         XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
     899             :                                                           link,
     900             :                                                           &prefetcher->filter_queue);
     901             : 
     902      734958 :         if (filter->filter_until_replayed >= replaying_lsn)
     903      704426 :             break;
     904             : 
     905       30532 :         dlist_delete(&filter->link);
     906       30532 :         hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
     907             :     }
     908     5323406 : }
     909             : 
     910             : /*
     911             :  * Check if a given block should be skipped due to a filter.
     912             :  */
     913             : static inline bool
     914     1013980 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     915             :                          BlockNumber blockno)
     916             : {
     917             :     /*
     918             :      * Test for empty queue first, because we expect it to be empty most of
     919             :      * the time and we can avoid the hash table lookup in that case.
     920             :      */
     921     1013980 :     if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     922             :     {
     923             :         XLogPrefetcherFilter *filter;
     924             : 
     925             :         /* See if the block range is filtered. */
     926      120724 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     927      120724 :         if (filter && filter->filter_from_block <= blockno)
     928             :         {
     929             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     930             :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     931             :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
     932             :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     933             :                  LSN_FORMAT_ARGS(filter->filter_until_replayed),
     934             :                  filter->filter_from_block);
     935             : #endif
     936       90664 :             return true;
     937             :         }
     938             : 
     939             :         /* See if the whole database is filtered. */
     940       30060 :         rlocator.relNumber = InvalidRelFileNumber;
     941       30060 :         rlocator.spcOid = InvalidOid;
     942       30060 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     943       30060 :         if (filter)
     944             :         {
     945             : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     946             :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     947             :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
     948             :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     949             :                  LSN_FORMAT_ARGS(filter->filter_until_replayed));
     950             : #endif
     951           0 :             return true;
     952             :         }
     953             :     }
     954             : 
     955      923316 :     return false;
     956             : }
     957             : 
     958             : /*
     959             :  * A wrapper for XLogBeginRead() that also resets the prefetcher.
     960             :  */
     961             : void
     962        3542 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
     963             : {
     964             :     /* This will forget about any in-flight IO. */
     965        3542 :     prefetcher->reconfigure_count--;
     966             : 
     967             :     /* Book-keeping to avoid readahead on first read. */
     968        3542 :     prefetcher->begin_ptr = recPtr;
     969             : 
     970        3542 :     prefetcher->no_readahead_until = 0;
     971             : 
     972             :     /* This will forget about any queued up records in the decoder. */
     973        3542 :     XLogBeginRead(prefetcher->reader, recPtr);
     974        3542 : }
     975             : 
     976             : /*
     977             :  * A wrapper for XLogReadRecord() that provides the same interface, but also
     978             :  * tries to initiate I/O for blocks referenced in future WAL records.
     979             :  */
     980             : XLogRecord *
     981     5323406 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
     982             : {
     983             :     DecodedXLogRecord *record;
     984             :     XLogRecPtr  replayed_up_to;
     985             : 
     986             :     /*
     987             :      * See if it's time to reset the prefetching machinery, because a relevant
     988             :      * GUC was changed.
     989             :      */
     990     5323406 :     if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
     991             :     {
     992             :         uint32      max_distance;
     993             :         uint32      max_inflight;
     994             : 
     995        3564 :         if (prefetcher->streaming_read)
     996        1914 :             lrq_free(prefetcher->streaming_read);
     997             : 
     998        3564 :         if (RecoveryPrefetchEnabled())
     999             :         {
    1000             :             Assert(maintenance_io_concurrency > 0);
    1001        3564 :             max_inflight = maintenance_io_concurrency;
    1002        3564 :             max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
    1003             :         }
    1004             :         else
    1005             :         {
    1006           0 :             max_inflight = 1;
    1007           0 :             max_distance = 1;
    1008             :         }
    1009             : 
    1010        3564 :         prefetcher->streaming_read = lrq_alloc(max_distance,
    1011             :                                                max_inflight,
    1012             :                                                (uintptr_t) prefetcher,
    1013             :                                                XLogPrefetcherNextBlock);
    1014             : 
    1015        3564 :         prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
    1016             :     }
    1017             : 
    1018             :     /*
    1019             :      * Release last returned record, if there is one, as it's now been
    1020             :      * replayed.
    1021             :      */
    1022     5323406 :     replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
    1023             : 
    1024             :     /*
    1025             :      * Can we drop any filters yet?  If we were waiting for a relation to be
    1026             :      * created or extended, it is now OK to access blocks in the covered
    1027             :      * range.
    1028             :      */
    1029     5323406 :     XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
    1030             : 
    1031             :     /*
    1032             :      * All IO initiated by earlier WAL is now completed.  This might trigger
    1033             :      * further prefetching.
    1034             :      */
    1035     5323406 :     lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
    1036             : 
    1037             :     /*
    1038             :      * If there's nothing queued yet, then start prefetching to cause at least
    1039             :      * one record to be queued.
    1040             :      */
    1041     5323306 :     if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
    1042             :     {
    1043             :         Assert(lrq_inflight(prefetcher->streaming_read) == 0);
    1044             :         Assert(lrq_completed(prefetcher->streaming_read) == 0);
    1045          34 :         lrq_prefetch(prefetcher->streaming_read);
    1046             :     }
    1047             : 
    1048             :     /* Read the next record. */
    1049     5323306 :     record = XLogNextRecord(prefetcher->reader, errmsg);
    1050     5323306 :     if (!record)
    1051         472 :         return NULL;
    1052             : 
    1053             :     /*
    1054             :      * The record we just got is the "current" one, for the benefit of the
    1055             :      * XLogRecXXX() macros.
    1056             :      */
    1057             :     Assert(record == prefetcher->reader->record);
    1058             : 
    1059             :     /*
    1060             :      * If maintenance_io_concurrency is set very low, we might have started
    1061             :      * prefetching some but not all of the blocks referenced in the record
    1062             :      * we're about to return.  Forget about the rest of the blocks in this
    1063             :      * record by dropping the prefetcher's reference to it.
    1064             :      */
    1065     5322834 :     if (record == prefetcher->record)
    1066        3542 :         prefetcher->record = NULL;
    1067             : 
    1068             :     /*
    1069             :      * See if it's time to compute some statistics, because enough WAL has
    1070             :      * been processed.
    1071             :      */
    1072     5322834 :     if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
    1073     1985252 :         XLogPrefetcherComputeStats(prefetcher);
    1074             : 
    1075             :     Assert(record == prefetcher->reader->record);
    1076             : 
    1077     5322834 :     return &record->header;
    1078             : }
    1079             : 
    1080             : bool
    1081        1982 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
    1082             : {
    1083             : #ifndef USE_PREFETCH
    1084             :     if (*new_value == RECOVERY_PREFETCH_ON)
    1085             :     {
    1086             :         GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
    1087             :         return false;
    1088             :     }
    1089             : #endif
    1090             : 
    1091        1982 :     return true;
    1092             : }
    1093             : 
    1094             : void
    1095        1982 : assign_recovery_prefetch(int new_value, void *extra)
    1096             : {
    1097             :     /* Reconfigure prefetching, because a setting it depends on changed. */
    1098        1982 :     recovery_prefetch = new_value;
    1099        1982 :     if (AmStartupProcess())
    1100           0 :         XLogPrefetchReconfigure();
    1101        1982 : }

Generated by: LCOV version 1.14