LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogprefetcher.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 97.5 % 285 278
Test Date: 2026-03-01 21:15:06 Functions: 100.0 % 24 24
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * xlogprefetcher.c
       4              :  *      Prefetching support for recovery.
       5              :  *
       6              :  * Portions Copyright (c) 2022-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *      src/backend/access/transam/xlogprefetcher.c
      12              :  *
      13              :  * This module provides a drop-in replacement for an XLogReader that tries to
      14              :  * minimize I/O stalls by looking ahead in the WAL.  If blocks that will be
      15              :  * accessed in the near future are not already in the buffer pool, it initiates
      16              :  * I/Os that might complete before the caller eventually needs the data.  When
      17              :  * referenced blocks are found in the buffer pool already, the buffer is
      18              :  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
      19              :  * avoid a second buffer mapping table lookup.
      20              :  *
      21              :  * Currently, only the main fork is considered for prefetching.  Currently,
      22              :  * prefetching is only effective on systems where PrefetchBuffer() does
      23              :  * something useful (mainly Linux).
      24              :  *
      25              :  *-------------------------------------------------------------------------
      26              :  */
      27              : 
      28              : #include "postgres.h"
      29              : 
      30              : #include "access/xlogprefetcher.h"
      31              : #include "access/xlogreader.h"
      32              : #include "catalog/pg_control.h"
      33              : #include "catalog/storage_xlog.h"
      34              : #include "commands/dbcommands_xlog.h"
      35              : #include "funcapi.h"
      36              : #include "miscadmin.h"
      37              : #include "port/atomics.h"
      38              : #include "storage/bufmgr.h"
      39              : #include "storage/shmem.h"
      40              : #include "storage/smgr.h"
      41              : #include "utils/fmgrprotos.h"
      42              : #include "utils/guc_hooks.h"
      43              : #include "utils/hsearch.h"
      44              : #include "utils/timestamp.h"
      45              : 
      46              : /*
      47              :  * Every time we process this much WAL, we'll update the values in
      48              :  * pg_stat_recovery_prefetch.
      49              :  */
      50              : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
      51              : 
      52              : /*
      53              :  * To detect repeated access to the same block and skip useless extra system
      54              :  * calls, we remember a small window of recently prefetched blocks.
      55              :  */
      56              : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
      57              : 
      58              : /*
      59              :  * When maintenance_io_concurrency is not saturated, we're prepared to look
      60              :  * ahead up to N times that number of block references.
      61              :  */
      62              : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
      63              : 
      64              : /* Define to log internal debugging messages. */
      65              : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
      66              : 
      67              : /* GUCs */
      68              : int         recovery_prefetch = RECOVERY_PREFETCH_TRY;
      69              : 
      70              : #ifdef USE_PREFETCH
      71              : #define RecoveryPrefetchEnabled() \
      72              :         (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
      73              :          maintenance_io_concurrency > 0)
      74              : #else
      75              : #define RecoveryPrefetchEnabled() false
      76              : #endif
      77              : 
      78              : static int  XLogPrefetchReconfigureCount = 0;
      79              : 
      80              : /*
      81              :  * Enum used to report whether an IO should be started.
      82              :  */
      83              : typedef enum
      84              : {
      85              :     LRQ_NEXT_NO_IO,
      86              :     LRQ_NEXT_IO,
      87              :     LRQ_NEXT_AGAIN,
      88              : } LsnReadQueueNextStatus;
      89              : 
      90              : /*
      91              :  * Type of callback that can decide which block to prefetch next.  For now
      92              :  * there is only one.
      93              :  */
      94              : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
      95              :                                                        XLogRecPtr *lsn);
      96              : 
      97              : /*
      98              :  * A simple circular queue of LSNs, using to control the number of
      99              :  * (potentially) inflight IOs.  This stands in for a later more general IO
     100              :  * control mechanism, which is why it has the apparently unnecessary
     101              :  * indirection through a function pointer.
     102              :  */
     103              : typedef struct LsnReadQueue
     104              : {
     105              :     LsnReadQueueNextFun next;
     106              :     uintptr_t   lrq_private;
     107              :     uint32      max_inflight;
     108              :     uint32      inflight;
     109              :     uint32      completed;
     110              :     uint32      head;
     111              :     uint32      tail;
     112              :     uint32      size;
     113              :     struct
     114              :     {
     115              :         bool        io;
     116              :         XLogRecPtr  lsn;
     117              :     }           queue[FLEXIBLE_ARRAY_MEMBER];
     118              : } LsnReadQueue;
     119              : 
     120              : /*
     121              :  * A prefetcher.  This is a mechanism that wraps an XLogReader, prefetching
     122              :  * blocks that will be soon be referenced, to try to avoid IO stalls.
     123              :  */
     124              : struct XLogPrefetcher
     125              : {
     126              :     /* WAL reader and current reading state. */
     127              :     XLogReaderState *reader;
     128              :     DecodedXLogRecord *record;
     129              :     int         next_block_id;
     130              : 
     131              :     /* When to publish stats. */
     132              :     XLogRecPtr  next_stats_shm_lsn;
     133              : 
     134              :     /* Book-keeping to avoid accessing blocks that don't exist yet. */
     135              :     HTAB       *filter_table;
     136              :     dlist_head  filter_queue;
     137              : 
     138              :     /* Book-keeping to avoid repeat prefetches. */
     139              :     RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     140              :     BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     141              :     int         recent_idx;
     142              : 
     143              :     /* Book-keeping to disable prefetching temporarily. */
     144              :     XLogRecPtr  no_readahead_until;
     145              : 
     146              :     /* IO depth manager. */
     147              :     LsnReadQueue *streaming_read;
     148              : 
     149              :     XLogRecPtr  begin_ptr;
     150              : 
     151              :     int         reconfigure_count;
     152              : };
     153              : 
     154              : /*
     155              :  * A temporary filter used to track block ranges that haven't been created
     156              :  * yet, whole relations that haven't been created yet, and whole relations
     157              :  * that (we assume) have already been dropped, or will be created by bulk WAL
     158              :  * operators.
     159              :  */
     160              : typedef struct XLogPrefetcherFilter
     161              : {
     162              :     RelFileLocator rlocator;
     163              :     XLogRecPtr  filter_until_replayed;
     164              :     BlockNumber filter_from_block;
     165              :     dlist_node  link;
     166              : } XLogPrefetcherFilter;
     167              : 
     168              : /*
     169              :  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
     170              :  */
     171              : typedef struct XLogPrefetchStats
     172              : {
     173              :     pg_atomic_uint64 reset_time;    /* Time of last reset. */
     174              :     pg_atomic_uint64 prefetch;  /* Prefetches initiated. */
     175              :     pg_atomic_uint64 hit;       /* Blocks already in cache. */
     176              :     pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
     177              :     pg_atomic_uint64 skip_new;  /* New/missing blocks filtered. */
     178              :     pg_atomic_uint64 skip_fpw;  /* FPWs skipped. */
     179              :     pg_atomic_uint64 skip_rep;  /* Repeat accesses skipped. */
     180              : 
     181              :     /* Dynamic values */
     182              :     int         wal_distance;   /* Number of WAL bytes ahead. */
     183              :     int         block_distance; /* Number of block references ahead. */
     184              :     int         io_depth;       /* Number of I/Os in progress. */
     185              : } XLogPrefetchStats;
     186              : 
     187              : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
     188              :                                            RelFileLocator rlocator,
     189              :                                            BlockNumber blockno,
     190              :                                            XLogRecPtr lsn);
     191              : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
     192              :                                             RelFileLocator rlocator,
     193              :                                             BlockNumber blockno);
     194              : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
     195              :                                                  XLogRecPtr replaying_lsn);
     196              : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
     197              :                                                       XLogRecPtr *lsn);
     198              : 
     199              : static XLogPrefetchStats *SharedStats;
     200              : 
     201              : static inline LsnReadQueue *
     202         2207 : lrq_alloc(uint32 max_distance,
     203              :           uint32 max_inflight,
     204              :           uintptr_t lrq_private,
     205              :           LsnReadQueueNextFun next)
     206              : {
     207              :     LsnReadQueue *lrq;
     208              :     uint32      size;
     209              : 
     210              :     Assert(max_distance >= max_inflight);
     211              : 
     212         2207 :     size = max_distance + 1;    /* full ring buffer has a gap */
     213         2207 :     lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
     214         2207 :     lrq->lrq_private = lrq_private;
     215         2207 :     lrq->max_inflight = max_inflight;
     216         2207 :     lrq->size = size;
     217         2207 :     lrq->next = next;
     218         2207 :     lrq->head = 0;
     219         2207 :     lrq->tail = 0;
     220         2207 :     lrq->inflight = 0;
     221         2207 :     lrq->completed = 0;
     222              : 
     223         2207 :     return lrq;
     224              : }
     225              : 
     226              : static inline void
     227         2142 : lrq_free(LsnReadQueue *lrq)
     228              : {
     229         2142 :     pfree(lrq);
     230         2142 : }
     231              : 
     232              : static inline uint32
     233      1068986 : lrq_inflight(LsnReadQueue *lrq)
     234              : {
     235      1068986 :     return lrq->inflight;
     236              : }
     237              : 
     238              : static inline uint32
     239      1068986 : lrq_completed(LsnReadQueue *lrq)
     240              : {
     241      1068986 :     return lrq->completed;
     242              : }
     243              : 
     244              : static inline void
     245      2799174 : lrq_prefetch(LsnReadQueue *lrq)
     246              : {
     247              :     /* Try to start as many IOs as we can within our limits. */
     248      8542965 :     while (lrq->inflight < lrq->max_inflight &&
     249      5740071 :            lrq->inflight + lrq->completed < lrq->size - 1)
     250              :     {
     251              :         Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
     252      3418537 :         switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
     253              :         {
     254       473861 :             case LRQ_NEXT_AGAIN:
     255       473861 :                 return;
     256         6877 :             case LRQ_NEXT_IO:
     257         6877 :                 lrq->queue[lrq->head].io = true;
     258         6877 :                 lrq->inflight++;
     259         6877 :                 break;
     260      2937740 :             case LRQ_NEXT_NO_IO:
     261      2937740 :                 lrq->queue[lrq->head].io = false;
     262      2937740 :                 lrq->completed++;
     263      2937740 :                 break;
     264              :         }
     265      2944617 :         lrq->head++;
     266      2944617 :         if (lrq->head == lrq->size)
     267        45242 :             lrq->head = 0;
     268              :     }
     269              : }
     270              : 
     271              : static inline void
     272      2799154 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
     273              : {
     274              :     /*
     275              :      * We know that LSNs before 'lsn' have been replayed, so we can now assume
     276              :      * that any IOs that were started before then have finished.
     277              :      */
     278      8542847 :     while (lrq->tail != lrq->head &&
     279      5715027 :            lrq->queue[lrq->tail].lsn < lsn)
     280              :     {
     281      2944539 :         if (lrq->queue[lrq->tail].io)
     282         6877 :             lrq->inflight--;
     283              :         else
     284      2937662 :             lrq->completed--;
     285      2944539 :         lrq->tail++;
     286      2944539 :         if (lrq->tail == lrq->size)
     287        45241 :             lrq->tail = 0;
     288              :     }
     289      2799154 :     if (RecoveryPrefetchEnabled())
     290      2799154 :         lrq_prefetch(lrq);
     291      2799095 : }
     292              : 
     293              : size_t
     294         2147 : XLogPrefetchShmemSize(void)
     295              : {
     296         2147 :     return sizeof(XLogPrefetchStats);
     297              : }
     298              : 
     299              : /*
     300              :  * Reset all counters to zero.
     301              :  */
     302              : void
     303            3 : XLogPrefetchResetStats(void)
     304              : {
     305            3 :     pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     306            3 :     pg_atomic_write_u64(&SharedStats->prefetch, 0);
     307            3 :     pg_atomic_write_u64(&SharedStats->hit, 0);
     308            3 :     pg_atomic_write_u64(&SharedStats->skip_init, 0);
     309            3 :     pg_atomic_write_u64(&SharedStats->skip_new, 0);
     310            3 :     pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
     311            3 :     pg_atomic_write_u64(&SharedStats->skip_rep, 0);
     312            3 : }
     313              : 
     314              : void
     315         1150 : XLogPrefetchShmemInit(void)
     316              : {
     317              :     bool        found;
     318              : 
     319         1150 :     SharedStats = (XLogPrefetchStats *)
     320         1150 :         ShmemInitStruct("XLogPrefetchStats",
     321              :                         sizeof(XLogPrefetchStats),
     322              :                         &found);
     323              : 
     324         1150 :     if (!found)
     325              :     {
     326         1150 :         pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     327         1150 :         pg_atomic_init_u64(&SharedStats->prefetch, 0);
     328         1150 :         pg_atomic_init_u64(&SharedStats->hit, 0);
     329         1150 :         pg_atomic_init_u64(&SharedStats->skip_init, 0);
     330         1150 :         pg_atomic_init_u64(&SharedStats->skip_new, 0);
     331         1150 :         pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
     332         1150 :         pg_atomic_init_u64(&SharedStats->skip_rep, 0);
     333              :     }
     334         1150 : }
     335              : 
     336              : /*
     337              :  * Called when any GUC is changed that affects prefetching.
     338              :  */
     339              : void
     340           12 : XLogPrefetchReconfigure(void)
     341              : {
     342           12 :     XLogPrefetchReconfigureCount++;
     343           12 : }
     344              : 
     345              : /*
     346              :  * Increment a counter in shared memory.  This is equivalent to *counter++ on a
     347              :  * plain uint64 without any memory barrier or locking, except on platforms
     348              :  * where readers can't read uint64 without possibly observing a torn value.
     349              :  */
     350              : static inline void
     351      2930915 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
     352              : {
     353              :     Assert(AmStartupProcess() || !IsUnderPostmaster);
     354      2930915 :     pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
     355      2930915 : }
     356              : 
     357              : /*
     358              :  * Create a prefetcher that is ready to begin prefetching blocks referenced by
     359              :  * WAL records.
     360              :  */
     361              : XLogPrefetcher *
     362         1006 : XLogPrefetcherAllocate(XLogReaderState *reader)
     363              : {
     364              :     XLogPrefetcher *prefetcher;
     365              :     HASHCTL     ctl;
     366              : 
     367         1006 :     prefetcher = palloc0_object(XLogPrefetcher);
     368         1006 :     prefetcher->reader = reader;
     369              : 
     370         1006 :     ctl.keysize = sizeof(RelFileLocator);
     371         1006 :     ctl.entrysize = sizeof(XLogPrefetcherFilter);
     372         1006 :     prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
     373              :                                            &ctl, HASH_ELEM | HASH_BLOBS);
     374         1006 :     dlist_init(&prefetcher->filter_queue);
     375              : 
     376         1006 :     SharedStats->wal_distance = 0;
     377         1006 :     SharedStats->block_distance = 0;
     378         1006 :     SharedStats->io_depth = 0;
     379              : 
     380              :     /* First usage will cause streaming_read to be allocated. */
     381         1006 :     prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
     382              : 
     383         1006 :     return prefetcher;
     384              : }
     385              : 
     386              : /*
     387              :  * Destroy a prefetcher and release all resources.
     388              :  */
     389              : void
     390          941 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
     391              : {
     392          941 :     lrq_free(prefetcher->streaming_read);
     393          941 :     hash_destroy(prefetcher->filter_table);
     394          941 :     pfree(prefetcher);
     395          941 : }
     396              : 
     397              : /*
     398              :  * Provide access to the reader.
     399              :  */
     400              : XLogReaderState *
     401      2799012 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
     402              : {
     403      2799012 :     return prefetcher->reader;
     404              : }
     405              : 
     406              : /*
     407              :  * Update the statistics visible in the pg_stat_recovery_prefetch view.
     408              :  */
     409              : void
     410      1068986 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
     411              : {
     412              :     uint32      io_depth;
     413              :     uint32      completed;
     414              :     int64       wal_distance;
     415              : 
     416              : 
     417              :     /* How far ahead of replay are we now? */
     418      1068986 :     if (prefetcher->reader->decode_queue_tail)
     419              :     {
     420      1061644 :         wal_distance =
     421      1061644 :             prefetcher->reader->decode_queue_tail->lsn -
     422      1061644 :             prefetcher->reader->decode_queue_head->lsn;
     423              :     }
     424              :     else
     425              :     {
     426         7342 :         wal_distance = 0;
     427              :     }
     428              : 
     429              :     /* How many IOs are currently in flight and completed? */
     430      1068986 :     io_depth = lrq_inflight(prefetcher->streaming_read);
     431      1068986 :     completed = lrq_completed(prefetcher->streaming_read);
     432              : 
     433              :     /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
     434      1068986 :     SharedStats->io_depth = io_depth;
     435      1068986 :     SharedStats->block_distance = io_depth + completed;
     436      1068986 :     SharedStats->wal_distance = wal_distance;
     437              : 
     438      1068986 :     prefetcher->next_stats_shm_lsn =
     439      1068986 :         prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
     440      1068986 : }
     441              : 
     442              : /*
     443              :  * A callback that examines the next block reference in the WAL, and possibly
     444              :  * starts an IO so that a later read will be fast.
     445              :  *
     446              :  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
     447              :  *
     448              :  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
     449              :  * that isn't in the buffer pool, and the kernel has been asked to start
     450              :  * reading it to make a future read system call faster. An LSN is written to
     451              :  * *lsn, and the I/O will be considered to have completed once that LSN is
     452              :  * replayed.
     453              :  *
     454              :  * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
     455              :  * that it was already in the buffer pool, or we decided for various reasons
     456              :  * not to prefetch.
     457              :  */
     458              : static LsnReadQueueNextStatus
     459      3418537 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
     460              : {
     461      3418537 :     XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
     462      3418537 :     XLogReaderState *reader = prefetcher->reader;
     463      3418537 :     XLogRecPtr  replaying_lsn = reader->ReadRecPtr;
     464              : 
     465              :     /*
     466              :      * We keep track of the record and block we're up to between calls with
     467              :      * prefetcher->record and prefetcher->next_block_id.
     468              :      */
     469              :     for (;;)
     470      2796673 :     {
     471              :         DecodedXLogRecord *record;
     472              : 
     473              :         /* Try to read a new future record, if we don't already have one. */
     474      6215210 :         if (prefetcher->record == NULL)
     475              :         {
     476              :             bool        nonblocking;
     477              : 
     478              :             /*
     479              :              * If there are already records or an error queued up that could
     480              :              * be replayed, we don't want to block here.  Otherwise, it's OK
     481              :              * to block waiting for more data: presumably the caller has
     482              :              * nothing else to do.
     483              :              */
     484      3270593 :             nonblocking = XLogReaderHasQueuedRecordOrError(reader);
     485              : 
     486              :             /* Readahead is disabled until we replay past a certain point. */
     487      3270593 :             if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
     488       461090 :                 return LRQ_NEXT_AGAIN;
     489              : 
     490      2809503 :             record = XLogReadAhead(prefetcher->reader, nonblocking);
     491      2809444 :             if (record == NULL)
     492              :             {
     493              :                 /*
     494              :                  * We can't read any more, due to an error or lack of data in
     495              :                  * nonblocking mode.  Don't try to read ahead again until
     496              :                  * we've replayed everything already decoded.
     497              :                  */
     498        10577 :                 if (nonblocking && prefetcher->reader->decode_queue_tail)
     499        10369 :                     prefetcher->no_readahead_until =
     500        10369 :                         prefetcher->reader->decode_queue_tail->lsn;
     501              : 
     502        10577 :                 return LRQ_NEXT_AGAIN;
     503              :             }
     504              : 
     505              :             /*
     506              :              * If prefetching is disabled, we don't need to analyze the record
     507              :              * or issue any prefetches.  We just need to cause one record to
     508              :              * be decoded.
     509              :              */
     510      2798867 :             if (!RecoveryPrefetchEnabled())
     511              :             {
     512            0 :                 *lsn = InvalidXLogRecPtr;
     513            0 :                 return LRQ_NEXT_NO_IO;
     514              :             }
     515              : 
     516              :             /* We have a new record to process. */
     517      2798867 :             prefetcher->record = record;
     518      2798867 :             prefetcher->next_block_id = 0;
     519              :         }
     520              :         else
     521              :         {
     522              :             /* Continue to process from last call, or last loop. */
     523      2944617 :             record = prefetcher->record;
     524              :         }
     525              : 
     526              :         /*
     527              :          * Check for operations that require us to filter out block ranges, or
     528              :          * pause readahead completely.
     529              :          */
     530      5743484 :         if (replaying_lsn < record->lsn)
     531              :         {
     532      5743484 :             uint8       rmid = record->header.xl_rmid;
     533      5743484 :             uint8       record_type = record->header.xl_info & ~XLR_INFO_MASK;
     534              : 
     535      5743484 :             if (rmid == RM_XLOG_ID)
     536              :             {
     537        95445 :                 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
     538              :                     record_type == XLOG_END_OF_RECOVERY)
     539              :                 {
     540              :                     /*
     541              :                      * These records might change the TLI.  Avoid potential
     542              :                      * bugs if we were to allow "read TLI" and "replay TLI" to
     543              :                      * differ without more analysis.
     544              :                      */
     545         1727 :                     prefetcher->no_readahead_until = record->lsn;
     546              : 
     547              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     548              :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     549              :                          "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
     550              :                          LSN_FORMAT_ARGS(record->lsn));
     551              : #endif
     552              : 
     553              :                     /* Fall through so we move past this record. */
     554              :                 }
     555              :             }
     556      5648039 :             else if (rmid == RM_DBASE_ID)
     557              :             {
     558              :                 /*
     559              :                  * When databases are created with the file-copy strategy,
     560              :                  * there are no WAL records to tell us about the creation of
     561              :                  * individual relations.
     562              :                  */
     563           42 :                 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
     564              :                 {
     565            5 :                     xl_dbase_create_file_copy_rec *xlrec =
     566              :                         (xl_dbase_create_file_copy_rec *) record->main_data;
     567            5 :                     RelFileLocator rlocator =
     568            5 :                     {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
     569              : 
     570              :                     /*
     571              :                      * Don't try to prefetch anything in this database until
     572              :                      * it has been created, or we might confuse the blocks of
     573              :                      * different generations, if a database OID or
     574              :                      * relfilenumber is reused.  It's also more efficient than
     575              :                      * discovering that relations don't exist on disk yet with
     576              :                      * ENOENT errors.
     577              :                      */
     578            5 :                     XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
     579              : 
     580              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     581              :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     582              :                          "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
     583              :                          rlocator.dbOid,
     584              :                          LSN_FORMAT_ARGS(record->lsn));
     585              : #endif
     586              :                 }
     587              :             }
     588      5647997 :             else if (rmid == RM_SMGR_ID)
     589              :             {
     590        16698 :                 if (record_type == XLOG_SMGR_CREATE)
     591              :                 {
     592        16648 :                     xl_smgr_create *xlrec = (xl_smgr_create *)
     593              :                         record->main_data;
     594              : 
     595        16648 :                     if (xlrec->forkNum == MAIN_FORKNUM)
     596              :                     {
     597              :                         /*
     598              :                          * Don't prefetch anything for this whole relation
     599              :                          * until it has been created.  Otherwise we might
     600              :                          * confuse the blocks of different generations, if a
     601              :                          * relfilenumber is reused.  This also avoids the need
     602              :                          * to discover the problem via extra syscalls that
     603              :                          * report ENOENT.
     604              :                          */
     605        14858 :                         XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
     606              :                                                 record->lsn);
     607              : 
     608              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     609              :                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     610              :                              "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
     611              :                              xlrec->rlocator.spcOid,
     612              :                              xlrec->rlocator.dbOid,
     613              :                              xlrec->rlocator.relNumber,
     614              :                              LSN_FORMAT_ARGS(record->lsn));
     615              : #endif
     616              :                     }
     617              :                 }
     618           50 :                 else if (record_type == XLOG_SMGR_TRUNCATE)
     619              :                 {
     620           50 :                     xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
     621              :                         record->main_data;
     622              : 
     623              :                     /*
     624              :                      * Don't consider prefetching anything in the truncated
     625              :                      * range until the truncation has been performed.
     626              :                      */
     627           50 :                     XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
     628              :                                             xlrec->blkno,
     629              :                                             record->lsn);
     630              : 
     631              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     632              :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     633              :                          "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
     634              :                          xlrec->rlocator.spcOid,
     635              :                          xlrec->rlocator.dbOid,
     636              :                          xlrec->rlocator.relNumber,
     637              :                          xlrec->blkno,
     638              :                          LSN_FORMAT_ARGS(record->lsn));
     639              : #endif
     640              :                 }
     641              :             }
     642              :         }
     643              : 
     644              :         /* Scan the block references, starting where we left off last time. */
     645      5745904 :         while (prefetcher->next_block_id <= record->max_block_id)
     646              :         {
     647      2947037 :             int         block_id = prefetcher->next_block_id++;
     648      2947037 :             DecodedBkpBlock *block = &record->blocks[block_id];
     649              :             SMgrRelation reln;
     650              :             PrefetchBufferResult result;
     651              : 
     652      2947037 :             if (!block->in_use)
     653         2196 :                 continue;
     654              : 
     655              :             Assert(!BufferIsValid(block->prefetch_buffer));
     656              : 
     657              :             /*
     658              :              * Record the LSN of this record.  When it's replayed,
     659              :              * LsnReadQueue will consider any IOs submitted for earlier LSNs
     660              :              * to be finished.
     661              :              */
     662      2944841 :             *lsn = record->lsn;
     663              : 
     664              :             /* We don't try to prefetch anything but the main fork for now. */
     665      2944841 :             if (block->forknum != MAIN_FORKNUM)
     666              :             {
     667      2944617 :                 return LRQ_NEXT_NO_IO;
     668              :             }
     669              : 
     670              :             /*
     671              :              * If there is a full page image attached, we won't be reading the
     672              :              * page, so don't bother trying to prefetch.
     673              :              */
     674      2931139 :             if (block->has_image)
     675              :             {
     676      2414735 :                 XLogPrefetchIncrement(&SharedStats->skip_fpw);
     677      2414735 :                 return LRQ_NEXT_NO_IO;
     678              :             }
     679              : 
     680              :             /* There is no point in reading a page that will be zeroed. */
     681       516404 :             if (block->flags & BKPBLOCK_WILL_INIT)
     682              :             {
     683         6443 :                 XLogPrefetchIncrement(&SharedStats->skip_init);
     684         6443 :                 return LRQ_NEXT_NO_IO;
     685              :             }
     686              : 
     687              :             /* Should we skip prefetching this block due to a filter? */
     688       509961 :             if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
     689              :             {
     690        69092 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     691        69092 :                 return LRQ_NEXT_NO_IO;
     692              :             }
     693              : 
     694              :             /* There is no point in repeatedly prefetching the same block. */
     695      1137807 :             for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
     696              :             {
     697      1124963 :                 if (block->blkno == prefetcher->recent_block[i] &&
     698       445833 :                     RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
     699              :                 {
     700              :                     /*
     701              :                      * XXX If we also remembered where it was, we could set
     702              :                      * recent_buffer so that recovery could skip smgropen()
     703              :                      * and a buffer table lookup.
     704              :                      */
     705       428025 :                     XLogPrefetchIncrement(&SharedStats->skip_rep);
     706       428025 :                     return LRQ_NEXT_NO_IO;
     707              :                 }
     708              :             }
     709        12844 :             prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
     710        12844 :             prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
     711        12844 :             prefetcher->recent_idx =
     712        12844 :                 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
     713              : 
     714              :             /*
     715              :              * We could try to have a fast path for repeated references to the
     716              :              * same relation (with some scheme to handle invalidations
     717              :              * safely), but for now we'll call smgropen() every time.
     718              :              */
     719        12844 :             reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
     720              : 
     721              :             /*
     722              :              * If the relation file doesn't exist on disk, for example because
     723              :              * we're replaying after a crash and the file will be created and
     724              :              * then unlinked by WAL that hasn't been replayed yet, suppress
     725              :              * further prefetching in the relation until this record is
     726              :              * replayed.
     727              :              */
     728        12844 :             if (!smgrexists(reln, MAIN_FORKNUM))
     729              :             {
     730              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     731              :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     732              :                      "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
     733              :                      reln->smgr_rlocator.locator.spcOid,
     734              :                      reln->smgr_rlocator.locator.dbOid,
     735              :                      reln->smgr_rlocator.locator.relNumber,
     736              :                      LSN_FORMAT_ARGS(record->lsn));
     737              : #endif
     738            6 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
     739              :                                         record->lsn);
     740            6 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     741            6 :                 return LRQ_NEXT_NO_IO;
     742              :             }
     743              : 
     744              :             /*
     745              :              * If the relation isn't big enough to contain the referenced
     746              :              * block yet, suppress prefetching of this block and higher until
     747              :              * this record is replayed.
     748              :              */
     749        12838 :             if (block->blkno >= smgrnblocks(reln, block->forknum))
     750              :             {
     751              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     752              :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     753              :                      "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
     754              :                      reln->smgr_rlocator.locator.spcOid,
     755              :                      reln->smgr_rlocator.locator.dbOid,
     756              :                      reln->smgr_rlocator.locator.relNumber,
     757              :                      block->blkno,
     758              :                      LSN_FORMAT_ARGS(record->lsn));
     759              : #endif
     760         1367 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
     761              :                                         record->lsn);
     762         1367 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     763         1367 :                 return LRQ_NEXT_NO_IO;
     764              :             }
     765              : 
     766              :             /* Try to initiate prefetching. */
     767        11471 :             result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
     768        11471 :             if (BufferIsValid(result.recent_buffer))
     769              :             {
     770              :                 /* Cache hit, nothing to do. */
     771         4370 :                 XLogPrefetchIncrement(&SharedStats->hit);
     772         4370 :                 block->prefetch_buffer = result.recent_buffer;
     773         4370 :                 return LRQ_NEXT_NO_IO;
     774              :             }
     775         7101 :             else if (result.initiated_io)
     776              :             {
     777              :                 /* Cache miss, I/O (presumably) started. */
     778         6877 :                 XLogPrefetchIncrement(&SharedStats->prefetch);
     779         6877 :                 block->prefetch_buffer = InvalidBuffer;
     780         6877 :                 return LRQ_NEXT_IO;
     781              :             }
     782          224 :             else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
     783              :             {
     784              :                 /*
     785              :                  * This shouldn't be possible, because we already determined
     786              :                  * that the relation exists on disk and is big enough.
     787              :                  * Something is wrong with the cache invalidation for
     788              :                  * smgrexists(), smgrnblocks(), or the file was unlinked or
     789              :                  * truncated beneath our feet?
     790              :                  */
     791            0 :                 elog(ERROR,
     792              :                      "could not prefetch relation %u/%u/%u block %u",
     793              :                      reln->smgr_rlocator.locator.spcOid,
     794              :                      reln->smgr_rlocator.locator.dbOid,
     795              :                      reln->smgr_rlocator.locator.relNumber,
     796              :                      block->blkno);
     797              :             }
     798              :         }
     799              : 
     800              :         /*
     801              :          * Several callsites need to be able to read exactly one record
     802              :          * without any internal readahead.  Examples: xlog.c reading
     803              :          * checkpoint records with emode set to PANIC, which might otherwise
     804              :          * cause XLogPageRead() to panic on some future page, and xlog.c
     805              :          * determining where to start writing WAL next, which depends on the
     806              :          * contents of the reader's internal buffer after reading one record.
     807              :          * Therefore, don't even think about prefetching until the first
     808              :          * record after XLogPrefetcherBeginRead() has been consumed.
     809              :          */
     810      2798867 :         if (prefetcher->reader->decode_queue_tail &&
     811      2798866 :             prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
     812         2194 :             return LRQ_NEXT_AGAIN;
     813              : 
     814              :         /* Advance to the next record. */
     815      2796673 :         prefetcher->record = NULL;
     816              :     }
     817              :     pg_unreachable();
     818              : }
     819              : 
     820              : /*
     821              :  * Expose statistics about recovery prefetching.
     822              :  */
     823              : Datum
     824            6 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
     825              : {
     826              : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
     827            6 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     828              :     Datum       values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     829              :     bool        nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     830              : 
     831            6 :     InitMaterializedSRF(fcinfo, 0);
     832              : 
     833           66 :     for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
     834           60 :         nulls[i] = false;
     835              : 
     836            6 :     values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
     837            6 :     values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
     838            6 :     values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
     839            6 :     values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
     840            6 :     values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
     841            6 :     values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
     842            6 :     values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
     843            6 :     values[7] = Int32GetDatum(SharedStats->wal_distance);
     844            6 :     values[8] = Int32GetDatum(SharedStats->block_distance);
     845            6 :     values[9] = Int32GetDatum(SharedStats->io_depth);
     846            6 :     tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
     847              : 
     848            6 :     return (Datum) 0;
     849              : }
     850              : 
     851              : /*
     852              :  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
     853              :  * has been replayed.
     854              :  */
     855              : static inline void
     856        16286 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     857              :                         BlockNumber blockno, XLogRecPtr lsn)
     858              : {
     859              :     XLogPrefetcherFilter *filter;
     860              :     bool        found;
     861              : 
     862        16286 :     filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
     863        16286 :     if (!found)
     864              :     {
     865              :         /*
     866              :          * Don't allow any prefetching of this block or higher until replayed.
     867              :          */
     868        16279 :         filter->filter_until_replayed = lsn;
     869        16279 :         filter->filter_from_block = blockno;
     870        16279 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     871              :     }
     872              :     else
     873              :     {
     874              :         /*
     875              :          * We were already filtering this rlocator.  Extend the filter's
     876              :          * lifetime to cover this WAL record, but leave the lower of the block
     877              :          * numbers there because we don't want to have to track individual
     878              :          * blocks.
     879              :          */
     880            7 :         filter->filter_until_replayed = lsn;
     881            7 :         dlist_delete(&filter->link);
     882            7 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     883            7 :         filter->filter_from_block = Min(filter->filter_from_block, blockno);
     884              :     }
     885        16286 : }
     886              : 
     887              : /*
     888              :  * Have we replayed any records that caused us to begin filtering a block
     889              :  * range?  That means that relations should have been created, extended or
     890              :  * dropped as required, so we can stop filtering out accesses to a given
     891              :  * relfilenumber.
     892              :  */
     893              : static inline void
     894      2799154 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
     895              : {
     896      2815432 :     while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     897              :     {
     898       494431 :         XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
     899              :                                                           link,
     900              :                                                           &prefetcher->filter_queue);
     901              : 
     902       494431 :         if (filter->filter_until_replayed >= replaying_lsn)
     903       478153 :             break;
     904              : 
     905        16278 :         dlist_delete(&filter->link);
     906        16278 :         hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
     907              :     }
     908      2799154 : }
     909              : 
     910              : /*
     911              :  * Check if a given block should be skipped due to a filter.
     912              :  */
     913              : static inline bool
     914       509961 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     915              :                          BlockNumber blockno)
     916              : {
     917              :     /*
     918              :      * Test for empty queue first, because we expect it to be empty most of
     919              :      * the time and we can avoid the hash table lookup in that case.
     920              :      */
     921       509961 :     if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     922              :     {
     923              :         XLogPrefetcherFilter *filter;
     924              : 
     925              :         /* See if the block range is filtered. */
     926        89879 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     927        89879 :         if (filter && filter->filter_from_block <= blockno)
     928              :         {
     929              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     930              :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     931              :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
     932              :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     933              :                  LSN_FORMAT_ARGS(filter->filter_until_replayed),
     934              :                  filter->filter_from_block);
     935              : #endif
     936        69092 :             return true;
     937              :         }
     938              : 
     939              :         /* See if the whole database is filtered. */
     940        20787 :         rlocator.relNumber = InvalidRelFileNumber;
     941        20787 :         rlocator.spcOid = InvalidOid;
     942        20787 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     943        20787 :         if (filter)
     944              :         {
     945              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     946              :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     947              :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
     948              :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     949              :                  LSN_FORMAT_ARGS(filter->filter_until_replayed));
     950              : #endif
     951            0 :             return true;
     952              :         }
     953              :     }
     954              : 
     955       440869 :     return false;
     956              : }
     957              : 
     958              : /*
     959              :  * A wrapper for XLogBeginRead() that also resets the prefetcher.
     960              :  */
     961              : void
     962         2195 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
     963              : {
     964              :     /* This will forget about any in-flight IO. */
     965         2195 :     prefetcher->reconfigure_count--;
     966              : 
     967              :     /* Book-keeping to avoid readahead on first read. */
     968         2195 :     prefetcher->begin_ptr = recPtr;
     969              : 
     970         2195 :     prefetcher->no_readahead_until = InvalidXLogRecPtr;
     971              : 
     972              :     /* This will forget about any queued up records in the decoder. */
     973         2195 :     XLogBeginRead(prefetcher->reader, recPtr);
     974         2195 : }
     975              : 
     976              : /*
     977              :  * A wrapper for XLogReadRecord() that provides the same interface, but also
     978              :  * tries to initiate I/O for blocks referenced in future WAL records.
     979              :  */
     980              : XLogRecord *
     981      2799154 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
     982              : {
     983              :     DecodedXLogRecord *record;
     984              :     XLogRecPtr  replayed_up_to;
     985              : 
     986              :     /*
     987              :      * See if it's time to reset the prefetching machinery, because a relevant
     988              :      * GUC was changed.
     989              :      */
     990      2799154 :     if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
     991              :     {
     992              :         uint32      max_distance;
     993              :         uint32      max_inflight;
     994              : 
     995         2207 :         if (prefetcher->streaming_read)
     996         1201 :             lrq_free(prefetcher->streaming_read);
     997              : 
     998         2207 :         if (RecoveryPrefetchEnabled())
     999              :         {
    1000              :             Assert(maintenance_io_concurrency > 0);
    1001         2207 :             max_inflight = maintenance_io_concurrency;
    1002         2207 :             max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
    1003              :         }
    1004              :         else
    1005              :         {
    1006            0 :             max_inflight = 1;
    1007            0 :             max_distance = 1;
    1008              :         }
    1009              : 
    1010         2207 :         prefetcher->streaming_read = lrq_alloc(max_distance,
    1011              :                                                max_inflight,
    1012              :                                                (uintptr_t) prefetcher,
    1013              :                                                XLogPrefetcherNextBlock);
    1014              : 
    1015         2207 :         prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
    1016              :     }
    1017              : 
    1018              :     /*
    1019              :      * Release last returned record, if there is one, as it's now been
    1020              :      * replayed.
    1021              :      */
    1022      2799154 :     replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
    1023              : 
    1024              :     /*
    1025              :      * Can we drop any filters yet?  If we were waiting for a relation to be
    1026              :      * created or extended, it is now OK to access blocks in the covered
    1027              :      * range.
    1028              :      */
    1029      2799154 :     XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
    1030              : 
    1031              :     /*
    1032              :      * All IO initiated by earlier WAL is now completed.  This might trigger
    1033              :      * further prefetching.
    1034              :      */
    1035      2799154 :     lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
    1036              : 
    1037              :     /*
    1038              :      * If there's nothing queued yet, then start prefetching to cause at least
    1039              :      * one record to be queued.
    1040              :      */
    1041      2799095 :     if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
    1042              :     {
    1043              :         Assert(lrq_inflight(prefetcher->streaming_read) == 0);
    1044              :         Assert(lrq_completed(prefetcher->streaming_read) == 0);
    1045           20 :         lrq_prefetch(prefetcher->streaming_read);
    1046              :     }
    1047              : 
    1048              :     /* Read the next record. */
    1049      2799095 :     record = XLogNextRecord(prefetcher->reader, errmsg);
    1050      2799095 :     if (!record)
    1051          300 :         return NULL;
    1052              : 
    1053              :     /*
    1054              :      * The record we just got is the "current" one, for the benefit of the
    1055              :      * XLogRecXXX() macros.
    1056              :      */
    1057              :     Assert(record == prefetcher->reader->record);
    1058              : 
    1059              :     /*
    1060              :      * If maintenance_io_concurrency is set very low, we might have started
    1061              :      * prefetching some but not all of the blocks referenced in the record
    1062              :      * we're about to return.  Forget about the rest of the blocks in this
    1063              :      * record by dropping the prefetcher's reference to it.
    1064              :      */
    1065      2798795 :     if (record == prefetcher->record)
    1066         2194 :         prefetcher->record = NULL;
    1067              : 
    1068              :     /*
    1069              :      * See if it's time to compute some statistics, because enough WAL has
    1070              :      * been processed.
    1071              :      */
    1072      2798795 :     if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
    1073      1060703 :         XLogPrefetcherComputeStats(prefetcher);
    1074              : 
    1075              :     Assert(record == prefetcher->reader->record);
    1076              : 
    1077      2798795 :     return &record->header;
    1078              : }
    1079              : 
    1080              : bool
    1081         1187 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
    1082              : {
    1083              : #ifndef USE_PREFETCH
    1084              :     if (*new_value == RECOVERY_PREFETCH_ON)
    1085              :     {
    1086              :         GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
    1087              :         return false;
    1088              :     }
    1089              : #endif
    1090              : 
    1091         1187 :     return true;
    1092              : }
    1093              : 
    1094              : void
    1095         1187 : assign_recovery_prefetch(int new_value, void *extra)
    1096              : {
    1097              :     /* Reconfigure prefetching, because a setting it depends on changed. */
    1098         1187 :     recovery_prefetch = new_value;
    1099         1187 :     if (AmStartupProcess())
    1100            0 :         XLogPrefetchReconfigure();
    1101         1187 : }
        

Generated by: LCOV version 2.0-1