LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogprefetcher.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 97.5 % 283 276
Test Date: 2026-04-06 05:16:22 Functions: 100.0 % 24 24
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * xlogprefetcher.c
       4              :  *      Prefetching support for recovery.
       5              :  *
       6              :  * Portions Copyright (c) 2022-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *      src/backend/access/transam/xlogprefetcher.c
      12              :  *
      13              :  * This module provides a drop-in replacement for an XLogReader that tries to
      14              :  * minimize I/O stalls by looking ahead in the WAL.  If blocks that will be
      15              :  * accessed in the near future are not already in the buffer pool, it initiates
      16              :  * I/Os that might complete before the caller eventually needs the data.  When
      17              :  * referenced blocks are found in the buffer pool already, the buffer is
      18              :  * recorded in the decoded record so that XLogReadBufferForRedo() can try to
      19              :  * avoid a second buffer mapping table lookup.
      20              :  *
      21              :  * Currently, only the main fork is considered for prefetching.  Currently,
      22              :  * prefetching is only effective on systems where PrefetchBuffer() does
      23              :  * something useful (mainly Linux).
      24              :  *
      25              :  *-------------------------------------------------------------------------
      26              :  */
      27              : 
      28              : #include "postgres.h"
      29              : 
      30              : #include "access/xlogprefetcher.h"
      31              : #include "access/xlogreader.h"
      32              : #include "catalog/pg_control.h"
      33              : #include "catalog/storage_xlog.h"
      34              : #include "commands/dbcommands_xlog.h"
      35              : #include "funcapi.h"
      36              : #include "miscadmin.h"
      37              : #include "port/atomics.h"
      38              : #include "storage/bufmgr.h"
      39              : #include "storage/fd.h"
      40              : #include "storage/shmem.h"
      41              : #include "storage/smgr.h"
      42              : #include "storage/subsystems.h"
      43              : #include "utils/fmgrprotos.h"
      44              : #include "utils/guc_hooks.h"
      45              : #include "utils/hsearch.h"
      46              : #include "utils/timestamp.h"
      47              : #include "utils/tuplestore.h"
      48              : 
      49              : /*
      50              :  * Every time we process this much WAL, we'll update the values in
      51              :  * pg_stat_recovery_prefetch.
      52              :  */
      53              : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
      54              : 
      55              : /*
      56              :  * To detect repeated access to the same block and skip useless extra system
      57              :  * calls, we remember a small window of recently prefetched blocks.
      58              :  */
      59              : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
      60              : 
      61              : /*
      62              :  * When maintenance_io_concurrency is not saturated, we're prepared to look
      63              :  * ahead up to N times that number of block references.
      64              :  */
      65              : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
      66              : 
      67              : /* Define to log internal debugging messages. */
      68              : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
      69              : 
      70              : /* GUCs */
      71              : int         recovery_prefetch = RECOVERY_PREFETCH_TRY;
      72              : 
      73              : #ifdef USE_PREFETCH
      74              : #define RecoveryPrefetchEnabled() \
      75              :         (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
      76              :          maintenance_io_concurrency > 0)
      77              : #else
      78              : #define RecoveryPrefetchEnabled() false
      79              : #endif
      80              : 
      81              : static int  XLogPrefetchReconfigureCount = 0;
      82              : 
      83              : /*
      84              :  * Enum used to report whether an IO should be started.
      85              :  */
      86              : typedef enum
      87              : {
      88              :     LRQ_NEXT_NO_IO,
      89              :     LRQ_NEXT_IO,
      90              :     LRQ_NEXT_AGAIN,
      91              : } LsnReadQueueNextStatus;
      92              : 
      93              : /*
      94              :  * Type of callback that can decide which block to prefetch next.  For now
      95              :  * there is only one.
      96              :  */
      97              : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
      98              :                                                        XLogRecPtr *lsn);
      99              : 
     100              : /*
     101              :  * A simple circular queue of LSNs, using to control the number of
     102              :  * (potentially) inflight IOs.  This stands in for a later more general IO
     103              :  * control mechanism, which is why it has the apparently unnecessary
     104              :  * indirection through a function pointer.
     105              :  */
     106              : typedef struct LsnReadQueue
     107              : {
     108              :     LsnReadQueueNextFun next;
     109              :     uintptr_t   lrq_private;
     110              :     uint32      max_inflight;
     111              :     uint32      inflight;
     112              :     uint32      completed;
     113              :     uint32      head;
     114              :     uint32      tail;
     115              :     uint32      size;
     116              :     struct
     117              :     {
     118              :         bool        io;
     119              :         XLogRecPtr  lsn;
     120              :     }           queue[FLEXIBLE_ARRAY_MEMBER];
     121              : } LsnReadQueue;
     122              : 
     123              : /*
     124              :  * A prefetcher.  This is a mechanism that wraps an XLogReader, prefetching
     125              :  * blocks that will be soon be referenced, to try to avoid IO stalls.
     126              :  */
     127              : struct XLogPrefetcher
     128              : {
     129              :     /* WAL reader and current reading state. */
     130              :     XLogReaderState *reader;
     131              :     DecodedXLogRecord *record;
     132              :     int         next_block_id;
     133              : 
     134              :     /* When to publish stats. */
     135              :     XLogRecPtr  next_stats_shm_lsn;
     136              : 
     137              :     /* Book-keeping to avoid accessing blocks that don't exist yet. */
     138              :     HTAB       *filter_table;
     139              :     dlist_head  filter_queue;
     140              : 
     141              :     /* Book-keeping to avoid repeat prefetches. */
     142              :     RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     143              :     BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
     144              :     int         recent_idx;
     145              : 
     146              :     /* Book-keeping to disable prefetching temporarily. */
     147              :     XLogRecPtr  no_readahead_until;
     148              : 
     149              :     /* IO depth manager. */
     150              :     LsnReadQueue *streaming_read;
     151              : 
     152              :     XLogRecPtr  begin_ptr;
     153              : 
     154              :     int         reconfigure_count;
     155              : };
     156              : 
     157              : /*
     158              :  * A temporary filter used to track block ranges that haven't been created
     159              :  * yet, whole relations that haven't been created yet, and whole relations
     160              :  * that (we assume) have already been dropped, or will be created by bulk WAL
     161              :  * operators.
     162              :  */
     163              : typedef struct XLogPrefetcherFilter
     164              : {
     165              :     RelFileLocator rlocator;
     166              :     XLogRecPtr  filter_until_replayed;
     167              :     BlockNumber filter_from_block;
     168              :     dlist_node  link;
     169              : } XLogPrefetcherFilter;
     170              : 
     171              : /*
     172              :  * Counters exposed in shared memory for pg_stat_recovery_prefetch.
     173              :  */
     174              : typedef struct XLogPrefetchStats
     175              : {
     176              :     pg_atomic_uint64 reset_time;    /* Time of last reset. */
     177              :     pg_atomic_uint64 prefetch;  /* Prefetches initiated. */
     178              :     pg_atomic_uint64 hit;       /* Blocks already in cache. */
     179              :     pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
     180              :     pg_atomic_uint64 skip_new;  /* New/missing blocks filtered. */
     181              :     pg_atomic_uint64 skip_fpw;  /* FPWs skipped. */
     182              :     pg_atomic_uint64 skip_rep;  /* Repeat accesses skipped. */
     183              : 
     184              :     /* Dynamic values */
     185              :     int         wal_distance;   /* Number of WAL bytes ahead. */
     186              :     int         block_distance; /* Number of block references ahead. */
     187              :     int         io_depth;       /* Number of I/Os in progress. */
     188              : } XLogPrefetchStats;
     189              : 
     190              : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
     191              :                                            RelFileLocator rlocator,
     192              :                                            BlockNumber blockno,
     193              :                                            XLogRecPtr lsn);
     194              : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
     195              :                                             RelFileLocator rlocator,
     196              :                                             BlockNumber blockno);
     197              : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
     198              :                                                  XLogRecPtr replaying_lsn);
     199              : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
     200              :                                                       XLogRecPtr *lsn);
     201              : 
     202              : static XLogPrefetchStats *SharedStats;
     203              : 
     204              : static void XLogPrefetchShmemRequest(void *arg);
     205              : static void XLogPrefetchShmemInit(void *arg);
     206              : 
     207              : const ShmemCallbacks XLogPrefetchShmemCallbacks = {
     208              :     .request_fn = XLogPrefetchShmemRequest,
     209              :     .init_fn = XLogPrefetchShmemInit,
     210              : };
     211              : 
     212              : static inline LsnReadQueue *
     213         2338 : lrq_alloc(uint32 max_distance,
     214              :           uint32 max_inflight,
     215              :           uintptr_t lrq_private,
     216              :           LsnReadQueueNextFun next)
     217              : {
     218              :     LsnReadQueue *lrq;
     219              :     uint32      size;
     220              : 
     221              :     Assert(max_distance >= max_inflight);
     222              : 
     223         2338 :     size = max_distance + 1;    /* full ring buffer has a gap */
     224         2338 :     lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
     225         2338 :     lrq->lrq_private = lrq_private;
     226         2338 :     lrq->max_inflight = max_inflight;
     227         2338 :     lrq->size = size;
     228         2338 :     lrq->next = next;
     229         2338 :     lrq->head = 0;
     230         2338 :     lrq->tail = 0;
     231         2338 :     lrq->inflight = 0;
     232         2338 :     lrq->completed = 0;
     233              : 
     234         2338 :     return lrq;
     235              : }
     236              : 
     237              : static inline void
     238         2271 : lrq_free(LsnReadQueue *lrq)
     239              : {
     240         2271 :     pfree(lrq);
     241         2271 : }
     242              : 
     243              : static inline uint32
     244      1102177 : lrq_inflight(LsnReadQueue *lrq)
     245              : {
     246      1102177 :     return lrq->inflight;
     247              : }
     248              : 
     249              : static inline uint32
     250      1102177 : lrq_completed(LsnReadQueue *lrq)
     251              : {
     252      1102177 :     return lrq->completed;
     253              : }
     254              : 
     255              : static inline void
     256      2941308 : lrq_prefetch(LsnReadQueue *lrq)
     257              : {
     258              :     /* Try to start as many IOs as we can within our limits. */
     259      8906337 :     while (lrq->inflight < lrq->max_inflight &&
     260      5960679 :            lrq->inflight + lrq->completed < lrq->size - 1)
     261              :     {
     262              :         Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
     263      3830022 :         switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
     264              :         {
     265       806241 :             case LRQ_NEXT_AGAIN:
     266       806241 :                 return;
     267         7124 :             case LRQ_NEXT_IO:
     268         7124 :                 lrq->queue[lrq->head].io = true;
     269         7124 :                 lrq->inflight++;
     270         7124 :                 break;
     271      3016597 :             case LRQ_NEXT_NO_IO:
     272      3016597 :                 lrq->queue[lrq->head].io = false;
     273      3016597 :                 lrq->completed++;
     274      3016597 :                 break;
     275              :         }
     276      3023721 :         lrq->head++;
     277      3023721 :         if (lrq->head == lrq->size)
     278        46453 :             lrq->head = 0;
     279              :     }
     280              : }
     281              : 
     282              : static inline void
     283      2941283 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
     284              : {
     285              :     /*
     286              :      * We know that LSNs before 'lsn' have been replayed, so we can now assume
     287              :      * that any IOs that were started before then have finished.
     288              :      */
     289      8906214 :     while (lrq->tail != lrq->head &&
     290      5864454 :            lrq->queue[lrq->tail].lsn < lsn)
     291              :     {
     292      3023648 :         if (lrq->queue[lrq->tail].io)
     293         7124 :             lrq->inflight--;
     294              :         else
     295      3016524 :             lrq->completed--;
     296      3023648 :         lrq->tail++;
     297      3023648 :         if (lrq->tail == lrq->size)
     298        46452 :             lrq->tail = 0;
     299              :     }
     300      2941283 :     if (RecoveryPrefetchEnabled())
     301      2941283 :         lrq_prefetch(lrq);
     302      2941223 : }
     303              : 
     304              : static void
     305         1232 : XLogPrefetchShmemRequest(void *arg)
     306              : {
     307         1232 :     ShmemRequestStruct(.name = "XLogPrefetchStats",
     308              :                        .size = sizeof(XLogPrefetchStats),
     309              :                        .ptr = (void **) &SharedStats,
     310              :         );
     311         1232 : }
     312              : 
     313              : static void
     314         1229 : XLogPrefetchShmemInit(void *arg)
     315              : {
     316         1229 :     pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     317         1229 :     pg_atomic_init_u64(&SharedStats->prefetch, 0);
     318         1229 :     pg_atomic_init_u64(&SharedStats->hit, 0);
     319         1229 :     pg_atomic_init_u64(&SharedStats->skip_init, 0);
     320         1229 :     pg_atomic_init_u64(&SharedStats->skip_new, 0);
     321         1229 :     pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
     322         1229 :     pg_atomic_init_u64(&SharedStats->skip_rep, 0);
     323         1229 : }
     324              : 
     325              : /*
     326              :  * Reset all counters to zero.
     327              :  */
     328              : void
     329            4 : XLogPrefetchResetStats(void)
     330              : {
     331            4 :     pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
     332            4 :     pg_atomic_write_u64(&SharedStats->prefetch, 0);
     333            4 :     pg_atomic_write_u64(&SharedStats->hit, 0);
     334            4 :     pg_atomic_write_u64(&SharedStats->skip_init, 0);
     335            4 :     pg_atomic_write_u64(&SharedStats->skip_new, 0);
     336            4 :     pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
     337            4 :     pg_atomic_write_u64(&SharedStats->skip_rep, 0);
     338            4 : }
     339              : 
     340              : 
     341              : /*
     342              :  * Called when any GUC is changed that affects prefetching.
     343              :  */
     344              : void
     345           11 : XLogPrefetchReconfigure(void)
     346              : {
     347           11 :     XLogPrefetchReconfigureCount++;
     348           11 : }
     349              : 
     350              : /*
     351              :  * Increment a counter in shared memory.  This is equivalent to *counter++ on a
     352              :  * plain uint64 without any memory barrier or locking, except on platforms
     353              :  * where readers can't read uint64 without possibly observing a torn value.
     354              :  */
     355              : static inline void
     356      3007404 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
     357              : {
     358              :     Assert(AmStartupProcess() || !IsUnderPostmaster);
     359      3007404 :     pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
     360      3007404 : }
     361              : 
     362              : /*
     363              :  * Create a prefetcher that is ready to begin prefetching blocks referenced by
     364              :  * WAL records.
     365              :  */
     366              : XLogPrefetcher *
     367         1072 : XLogPrefetcherAllocate(XLogReaderState *reader)
     368              : {
     369              :     XLogPrefetcher *prefetcher;
     370              :     HASHCTL     ctl;
     371              : 
     372         1072 :     prefetcher = palloc0_object(XLogPrefetcher);
     373         1072 :     prefetcher->reader = reader;
     374              : 
     375         1072 :     ctl.keysize = sizeof(RelFileLocator);
     376         1072 :     ctl.entrysize = sizeof(XLogPrefetcherFilter);
     377         1072 :     prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
     378              :                                            &ctl, HASH_ELEM | HASH_BLOBS);
     379         1072 :     dlist_init(&prefetcher->filter_queue);
     380              : 
     381         1072 :     SharedStats->wal_distance = 0;
     382         1072 :     SharedStats->block_distance = 0;
     383         1072 :     SharedStats->io_depth = 0;
     384              : 
     385              :     /* First usage will cause streaming_read to be allocated. */
     386         1072 :     prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
     387              : 
     388         1072 :     return prefetcher;
     389              : }
     390              : 
     391              : /*
     392              :  * Destroy a prefetcher and release all resources.
     393              :  */
     394              : void
     395         1005 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
     396              : {
     397         1005 :     lrq_free(prefetcher->streaming_read);
     398         1005 :     hash_destroy(prefetcher->filter_table);
     399         1005 :     pfree(prefetcher);
     400         1005 : }
     401              : 
     402              : /*
     403              :  * Provide access to the reader.
     404              :  */
     405              : XLogReaderState *
     406      2941141 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
     407              : {
     408      2941141 :     return prefetcher->reader;
     409              : }
     410              : 
     411              : /*
     412              :  * Update the statistics visible in the pg_stat_recovery_prefetch view.
     413              :  */
     414              : void
     415      1102177 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
     416              : {
     417              :     uint32      io_depth;
     418              :     uint32      completed;
     419              :     int64       wal_distance;
     420              : 
     421              : 
     422              :     /* How far ahead of replay are we now? */
     423      1102177 :     if (prefetcher->reader->decode_queue_tail)
     424              :     {
     425      1086902 :         wal_distance =
     426      1086902 :             prefetcher->reader->decode_queue_tail->lsn -
     427      1086902 :             prefetcher->reader->decode_queue_head->lsn;
     428              :     }
     429              :     else
     430              :     {
     431        15275 :         wal_distance = 0;
     432              :     }
     433              : 
     434              :     /* How many IOs are currently in flight and completed? */
     435      1102177 :     io_depth = lrq_inflight(prefetcher->streaming_read);
     436      1102177 :     completed = lrq_completed(prefetcher->streaming_read);
     437              : 
     438              :     /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
     439      1102177 :     SharedStats->io_depth = io_depth;
     440      1102177 :     SharedStats->block_distance = io_depth + completed;
     441      1102177 :     SharedStats->wal_distance = wal_distance;
     442              : 
     443      1102177 :     prefetcher->next_stats_shm_lsn =
     444      1102177 :         prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
     445      1102177 : }
     446              : 
     447              : /*
     448              :  * A callback that examines the next block reference in the WAL, and possibly
     449              :  * starts an IO so that a later read will be fast.
     450              :  *
     451              :  * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
     452              :  *
     453              :  * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
     454              :  * that isn't in the buffer pool, and the kernel has been asked to start
     455              :  * reading it to make a future read system call faster. An LSN is written to
     456              :  * *lsn, and the I/O will be considered to have completed once that LSN is
     457              :  * replayed.
     458              :  *
     459              :  * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
     460              :  * that it was already in the buffer pool, or we decided for various reasons
     461              :  * not to prefetch.
     462              :  */
     463              : static LsnReadQueueNextStatus
     464      3830022 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
     465              : {
     466      3830022 :     XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
     467      3830022 :     XLogReaderState *reader = prefetcher->reader;
     468      3830022 :     XLogRecPtr  replaying_lsn = reader->ReadRecPtr;
     469              : 
     470              :     /*
     471              :      * We keep track of the record and block we're up to between calls with
     472              :      * prefetcher->record and prefetcher->next_block_id.
     473              :      */
     474              :     for (;;)
     475      2938669 :     {
     476              :         DecodedXLogRecord *record;
     477              : 
     478              :         /* Try to read a new future record, if we don't already have one. */
     479      6768691 :         if (prefetcher->record == NULL)
     480              :         {
     481              :             bool        nonblocking;
     482              : 
     483              :             /*
     484              :              * If there are already records or an error queued up that could
     485              :              * be replayed, we don't want to block here.  Otherwise, it's OK
     486              :              * to block waiting for more data: presumably the caller has
     487              :              * nothing else to do.
     488              :              */
     489      3744970 :             nonblocking = XLogReaderHasQueuedRecordOrError(reader);
     490              : 
     491              :             /* Readahead is disabled until we replay past a certain point. */
     492      3744970 :             if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
     493       784875 :                 return LRQ_NEXT_AGAIN;
     494              : 
     495      2960095 :             record = XLogReadAhead(prefetcher->reader, nonblocking);
     496      2960035 :             if (record == NULL)
     497              :             {
     498              :                 /*
     499              :                  * We can't read any more, due to an error or lack of data in
     500              :                  * nonblocking mode.  Don't try to read ahead again until
     501              :                  * we've replayed everything already decoded.
     502              :                  */
     503        19041 :                 if (nonblocking && prefetcher->reader->decode_queue_tail)
     504        18825 :                     prefetcher->no_readahead_until =
     505        18825 :                         prefetcher->reader->decode_queue_tail->lsn;
     506              : 
     507        19041 :                 return LRQ_NEXT_AGAIN;
     508              :             }
     509              : 
     510              :             /*
     511              :              * If prefetching is disabled, we don't need to analyze the record
     512              :              * or issue any prefetches.  We just need to cause one record to
     513              :              * be decoded.
     514              :              */
     515      2940994 :             if (!RecoveryPrefetchEnabled())
     516              :             {
     517            0 :                 *lsn = InvalidXLogRecPtr;
     518            0 :                 return LRQ_NEXT_NO_IO;
     519              :             }
     520              : 
     521              :             /* We have a new record to process. */
     522      2940994 :             prefetcher->record = record;
     523      2940994 :             prefetcher->next_block_id = 0;
     524              :         }
     525              :         else
     526              :         {
     527              :             /* Continue to process from last call, or last loop. */
     528      3023721 :             record = prefetcher->record;
     529              :         }
     530              : 
     531              :         /*
     532              :          * Check for operations that require us to filter out block ranges, or
     533              :          * pause readahead completely.
     534              :          */
     535      5964715 :         if (replaying_lsn < record->lsn)
     536              :         {
     537      5964715 :             uint8       rmid = record->header.xl_rmid;
     538      5964715 :             uint8       record_type = record->header.xl_info & ~XLR_INFO_MASK;
     539              : 
     540      5964715 :             if (rmid == RM_XLOG_ID)
     541              :             {
     542       171960 :                 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
     543              :                     record_type == XLOG_END_OF_RECOVERY)
     544              :                 {
     545              :                     /*
     546              :                      * These records might change the TLI.  Avoid potential
     547              :                      * bugs if we were to allow "read TLI" and "replay TLI" to
     548              :                      * differ without more analysis.
     549              :                      */
     550         1858 :                     prefetcher->no_readahead_until = record->lsn;
     551              : 
     552              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     553              :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     554              :                          "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
     555              :                          LSN_FORMAT_ARGS(record->lsn));
     556              : #endif
     557              : 
     558              :                     /* Fall through so we move past this record. */
     559              :                 }
     560              :             }
     561      5792755 :             else if (rmid == RM_DBASE_ID)
     562              :             {
     563              :                 /*
     564              :                  * When databases are created with the file-copy strategy,
     565              :                  * there are no WAL records to tell us about the creation of
     566              :                  * individual relations.
     567              :                  */
     568           46 :                 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
     569              :                 {
     570            5 :                     xl_dbase_create_file_copy_rec *xlrec =
     571              :                         (xl_dbase_create_file_copy_rec *) record->main_data;
     572            5 :                     RelFileLocator rlocator =
     573            5 :                     {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
     574              : 
     575              :                     /*
     576              :                      * Don't try to prefetch anything in this database until
     577              :                      * it has been created, or we might confuse the blocks of
     578              :                      * different generations, if a database OID or
     579              :                      * relfilenumber is reused.  It's also more efficient than
     580              :                      * discovering that relations don't exist on disk yet with
     581              :                      * ENOENT errors.
     582              :                      */
     583            5 :                     XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
     584              : 
     585              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     586              :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     587              :                          "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
     588              :                          rlocator.dbOid,
     589              :                          LSN_FORMAT_ARGS(record->lsn));
     590              : #endif
     591              :                 }
     592              :             }
     593      5792709 :             else if (rmid == RM_SMGR_ID)
     594              :             {
     595        18319 :                 if (record_type == XLOG_SMGR_CREATE)
     596              :                 {
     597        18269 :                     xl_smgr_create *xlrec = (xl_smgr_create *)
     598              :                         record->main_data;
     599              : 
     600        18269 :                     if (xlrec->forkNum == MAIN_FORKNUM)
     601              :                     {
     602              :                         /*
     603              :                          * Don't prefetch anything for this whole relation
     604              :                          * until it has been created.  Otherwise we might
     605              :                          * confuse the blocks of different generations, if a
     606              :                          * relfilenumber is reused.  This also avoids the need
     607              :                          * to discover the problem via extra syscalls that
     608              :                          * report ENOENT.
     609              :                          */
     610        16325 :                         XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
     611              :                                                 record->lsn);
     612              : 
     613              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     614              :                         elog(XLOGPREFETCHER_DEBUG_LEVEL,
     615              :                              "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
     616              :                              xlrec->rlocator.spcOid,
     617              :                              xlrec->rlocator.dbOid,
     618              :                              xlrec->rlocator.relNumber,
     619              :                              LSN_FORMAT_ARGS(record->lsn));
     620              : #endif
     621              :                     }
     622              :                 }
     623           50 :                 else if (record_type == XLOG_SMGR_TRUNCATE)
     624              :                 {
     625           50 :                     xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
     626              :                         record->main_data;
     627              : 
     628              :                     /*
     629              :                      * Don't consider prefetching anything in the truncated
     630              :                      * range until the truncation has been performed.
     631              :                      */
     632           50 :                     XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
     633              :                                             xlrec->blkno,
     634              :                                             record->lsn);
     635              : 
     636              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     637              :                     elog(XLOGPREFETCHER_DEBUG_LEVEL,
     638              :                          "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
     639              :                          xlrec->rlocator.spcOid,
     640              :                          xlrec->rlocator.dbOid,
     641              :                          xlrec->rlocator.relNumber,
     642              :                          xlrec->blkno,
     643              :                          LSN_FORMAT_ARGS(record->lsn));
     644              : #endif
     645              :                 }
     646              :             }
     647              :         }
     648              : 
     649              :         /* Scan the block references, starting where we left off last time. */
     650      5967195 :         while (prefetcher->next_block_id <= record->max_block_id)
     651              :         {
     652      3026201 :             int         block_id = prefetcher->next_block_id++;
     653      3026201 :             DecodedBkpBlock *block = &record->blocks[block_id];
     654              :             SMgrRelation reln;
     655              :             PrefetchBufferResult result;
     656              : 
     657      3026201 :             if (!block->in_use)
     658         2227 :                 continue;
     659              : 
     660              :             Assert(!BufferIsValid(block->prefetch_buffer));
     661              : 
     662              :             /*
     663              :              * Record the LSN of this record.  When it's replayed,
     664              :              * LsnReadQueue will consider any IOs submitted for earlier LSNs
     665              :              * to be finished.
     666              :              */
     667      3023974 :             *lsn = record->lsn;
     668              : 
     669              :             /* We don't try to prefetch anything but the main fork for now. */
     670      3023974 :             if (block->forknum != MAIN_FORKNUM)
     671              :             {
     672      3023721 :                 return LRQ_NEXT_NO_IO;
     673              :             }
     674              : 
     675              :             /*
     676              :              * If there is a full page image attached, we won't be reading the
     677              :              * page, so don't bother trying to prefetch.
     678              :              */
     679      3007657 :             if (block->has_image)
     680              :             {
     681      2469952 :                 XLogPrefetchIncrement(&SharedStats->skip_fpw);
     682      2469952 :                 return LRQ_NEXT_NO_IO;
     683              :             }
     684              : 
     685              :             /* There is no point in reading a page that will be zeroed. */
     686       537705 :             if (block->flags & BKPBLOCK_WILL_INIT)
     687              :             {
     688         6526 :                 XLogPrefetchIncrement(&SharedStats->skip_init);
     689         6526 :                 return LRQ_NEXT_NO_IO;
     690              :             }
     691              : 
     692              :             /* Should we skip prefetching this block due to a filter? */
     693       531179 :             if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
     694              :             {
     695        74359 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     696        74359 :                 return LRQ_NEXT_NO_IO;
     697              :             }
     698              : 
     699              :             /* There is no point in repeatedly prefetching the same block. */
     700      1180261 :             for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
     701              :             {
     702      1166648 :                 if (block->blkno == prefetcher->recent_block[i] &&
     703       460979 :                     RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
     704              :                 {
     705              :                     /*
     706              :                      * XXX If we also remembered where it was, we could set
     707              :                      * recent_buffer so that recovery could skip smgropen()
     708              :                      * and a buffer table lookup.
     709              :                      */
     710       443207 :                     XLogPrefetchIncrement(&SharedStats->skip_rep);
     711       443207 :                     return LRQ_NEXT_NO_IO;
     712              :                 }
     713              :             }
     714        13613 :             prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
     715        13613 :             prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
     716        13613 :             prefetcher->recent_idx =
     717        13613 :                 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
     718              : 
     719              :             /*
     720              :              * We could try to have a fast path for repeated references to the
     721              :              * same relation (with some scheme to handle invalidations
     722              :              * safely), but for now we'll call smgropen() every time.
     723              :              */
     724        13613 :             reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
     725              : 
     726              :             /*
     727              :              * If the relation file doesn't exist on disk, for example because
     728              :              * we're replaying after a crash and the file will be created and
     729              :              * then unlinked by WAL that hasn't been replayed yet, suppress
     730              :              * further prefetching in the relation until this record is
     731              :              * replayed.
     732              :              */
     733        13613 :             if (!smgrexists(reln, MAIN_FORKNUM))
     734              :             {
     735              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     736              :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     737              :                      "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
     738              :                      reln->smgr_rlocator.locator.spcOid,
     739              :                      reln->smgr_rlocator.locator.dbOid,
     740              :                      reln->smgr_rlocator.locator.relNumber,
     741              :                      LSN_FORMAT_ARGS(record->lsn));
     742              : #endif
     743            6 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
     744              :                                         record->lsn);
     745            6 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     746            6 :                 return LRQ_NEXT_NO_IO;
     747              :             }
     748              : 
     749              :             /*
     750              :              * If the relation isn't big enough to contain the referenced
     751              :              * block yet, suppress prefetching of this block and higher until
     752              :              * this record is replayed.
     753              :              */
     754        13607 :             if (block->blkno >= smgrnblocks(reln, block->forknum))
     755              :             {
     756              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     757              :                 elog(XLOGPREFETCHER_DEBUG_LEVEL,
     758              :                      "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
     759              :                      reln->smgr_rlocator.locator.spcOid,
     760              :                      reln->smgr_rlocator.locator.dbOid,
     761              :                      reln->smgr_rlocator.locator.relNumber,
     762              :                      block->blkno,
     763              :                      LSN_FORMAT_ARGS(record->lsn));
     764              : #endif
     765         1459 :                 XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
     766              :                                         record->lsn);
     767         1459 :                 XLogPrefetchIncrement(&SharedStats->skip_new);
     768         1459 :                 return LRQ_NEXT_NO_IO;
     769              :             }
     770              : 
     771              :             /* Try to initiate prefetching. */
     772        12148 :             result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
     773        12148 :             if (BufferIsValid(result.recent_buffer))
     774              :             {
     775              :                 /* Cache hit, nothing to do. */
     776         4771 :                 XLogPrefetchIncrement(&SharedStats->hit);
     777         4771 :                 block->prefetch_buffer = result.recent_buffer;
     778         4771 :                 return LRQ_NEXT_NO_IO;
     779              :             }
     780         7377 :             else if (result.initiated_io)
     781              :             {
     782              :                 /* Cache miss, I/O (presumably) started. */
     783         7124 :                 XLogPrefetchIncrement(&SharedStats->prefetch);
     784         7124 :                 block->prefetch_buffer = InvalidBuffer;
     785         7124 :                 return LRQ_NEXT_IO;
     786              :             }
     787          253 :             else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
     788              :             {
     789              :                 /*
     790              :                  * This shouldn't be possible, because we already determined
     791              :                  * that the relation exists on disk and is big enough.
     792              :                  * Something is wrong with the cache invalidation for
     793              :                  * smgrexists(), smgrnblocks(), or the file was unlinked or
     794              :                  * truncated beneath our feet?
     795              :                  */
     796            0 :                 elog(ERROR,
     797              :                      "could not prefetch relation %u/%u/%u block %u",
     798              :                      reln->smgr_rlocator.locator.spcOid,
     799              :                      reln->smgr_rlocator.locator.dbOid,
     800              :                      reln->smgr_rlocator.locator.relNumber,
     801              :                      block->blkno);
     802              :             }
     803              :         }
     804              : 
     805              :         /*
     806              :          * Several callsites need to be able to read exactly one record
     807              :          * without any internal readahead.  Examples: xlog.c reading
     808              :          * checkpoint records with emode set to PANIC, which might otherwise
     809              :          * cause XLogPageRead() to panic on some future page, and xlog.c
     810              :          * determining where to start writing WAL next, which depends on the
     811              :          * contents of the reader's internal buffer after reading one record.
     812              :          * Therefore, don't even think about prefetching until the first
     813              :          * record after XLogPrefetcherBeginRead() has been consumed.
     814              :          */
     815      2940994 :         if (prefetcher->reader->decode_queue_tail &&
     816      2940994 :             prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
     817         2325 :             return LRQ_NEXT_AGAIN;
     818              : 
     819              :         /* Advance to the next record. */
     820      2938669 :         prefetcher->record = NULL;
     821              :     }
     822              :     pg_unreachable();
     823              : }
     824              : 
     825              : /*
     826              :  * Expose statistics about recovery prefetching.
     827              :  */
     828              : Datum
     829            8 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
     830              : {
     831              : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
     832            8 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     833              :     Datum       values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     834              :     bool        nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
     835              : 
     836            8 :     InitMaterializedSRF(fcinfo, 0);
     837              : 
     838           88 :     for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
     839           80 :         nulls[i] = false;
     840              : 
     841            8 :     values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
     842            8 :     values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
     843            8 :     values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
     844            8 :     values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
     845            8 :     values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
     846            8 :     values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
     847            8 :     values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
     848            8 :     values[7] = Int32GetDatum(SharedStats->wal_distance);
     849            8 :     values[8] = Int32GetDatum(SharedStats->block_distance);
     850            8 :     values[9] = Int32GetDatum(SharedStats->io_depth);
     851            8 :     tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
     852              : 
     853            8 :     return (Datum) 0;
     854              : }
     855              : 
     856              : /*
     857              :  * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
     858              :  * has been replayed.
     859              :  */
     860              : static inline void
     861        17845 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     862              :                         BlockNumber blockno, XLogRecPtr lsn)
     863              : {
     864              :     XLogPrefetcherFilter *filter;
     865              :     bool        found;
     866              : 
     867        17845 :     filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
     868        17845 :     if (!found)
     869              :     {
     870              :         /*
     871              :          * Don't allow any prefetching of this block or higher until replayed.
     872              :          */
     873        17838 :         filter->filter_until_replayed = lsn;
     874        17838 :         filter->filter_from_block = blockno;
     875        17838 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     876              :     }
     877              :     else
     878              :     {
     879              :         /*
     880              :          * We were already filtering this rlocator.  Extend the filter's
     881              :          * lifetime to cover this WAL record, but leave the lower of the block
     882              :          * numbers there because we don't want to have to track individual
     883              :          * blocks.
     884              :          */
     885            7 :         filter->filter_until_replayed = lsn;
     886            7 :         dlist_delete(&filter->link);
     887            7 :         dlist_push_head(&prefetcher->filter_queue, &filter->link);
     888            7 :         filter->filter_from_block = Min(filter->filter_from_block, blockno);
     889              :     }
     890        17845 : }
     891              : 
     892              : /*
     893              :  * Have we replayed any records that caused us to begin filtering a block
     894              :  * range?  That means that relations should have been created, extended or
     895              :  * dropped as required, so we can stop filtering out accesses to a given
     896              :  * relfilenumber.
     897              :  */
     898              : static inline void
     899      2941283 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
     900              : {
     901      2959120 :     while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     902              :     {
     903       440963 :         XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
     904              :                                                           link,
     905              :                                                           &prefetcher->filter_queue);
     906              : 
     907       440963 :         if (filter->filter_until_replayed >= replaying_lsn)
     908       423126 :             break;
     909              : 
     910        17837 :         dlist_delete(&filter->link);
     911        17837 :         hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
     912              :     }
     913      2941283 : }
     914              : 
     915              : /*
     916              :  * Check if a given block should be skipped due to a filter.
     917              :  */
     918              : static inline bool
     919       531179 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
     920              :                          BlockNumber blockno)
     921              : {
     922              :     /*
     923              :      * Test for empty queue first, because we expect it to be empty most of
     924              :      * the time and we can avoid the hash table lookup in that case.
     925              :      */
     926       531179 :     if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
     927              :     {
     928              :         XLogPrefetcherFilter *filter;
     929              : 
     930              :         /* See if the block range is filtered. */
     931        95028 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     932        95028 :         if (filter && filter->filter_from_block <= blockno)
     933              :         {
     934              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     935              :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     936              :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
     937              :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     938              :                  LSN_FORMAT_ARGS(filter->filter_until_replayed),
     939              :                  filter->filter_from_block);
     940              : #endif
     941        74359 :             return true;
     942              :         }
     943              : 
     944              :         /* See if the whole database is filtered. */
     945        20669 :         rlocator.relNumber = InvalidRelFileNumber;
     946        20669 :         rlocator.spcOid = InvalidOid;
     947        20669 :         filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
     948        20669 :         if (filter)
     949              :         {
     950              : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
     951              :             elog(XLOGPREFETCHER_DEBUG_LEVEL,
     952              :                  "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
     953              :                  rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
     954              :                  LSN_FORMAT_ARGS(filter->filter_until_replayed));
     955              : #endif
     956            0 :             return true;
     957              :         }
     958              :     }
     959              : 
     960       456820 :     return false;
     961              : }
     962              : 
     963              : /*
     964              :  * A wrapper for XLogBeginRead() that also resets the prefetcher.
     965              :  */
     966              : void
     967         2327 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
     968              : {
     969              :     /* This will forget about any in-flight IO. */
     970         2327 :     prefetcher->reconfigure_count--;
     971              : 
     972              :     /* Book-keeping to avoid readahead on first read. */
     973         2327 :     prefetcher->begin_ptr = recPtr;
     974              : 
     975         2327 :     prefetcher->no_readahead_until = InvalidXLogRecPtr;
     976              : 
     977              :     /* This will forget about any queued up records in the decoder. */
     978         2327 :     XLogBeginRead(prefetcher->reader, recPtr);
     979         2327 : }
     980              : 
     981              : /*
     982              :  * A wrapper for XLogReadRecord() that provides the same interface, but also
     983              :  * tries to initiate I/O for blocks referenced in future WAL records.
     984              :  */
     985              : XLogRecord *
     986      2941283 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
     987              : {
     988              :     DecodedXLogRecord *record;
     989              :     XLogRecPtr  replayed_up_to;
     990              : 
     991              :     /*
     992              :      * See if it's time to reset the prefetching machinery, because a relevant
     993              :      * GUC was changed.
     994              :      */
     995      2941283 :     if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
     996              :     {
     997              :         uint32      max_distance;
     998              :         uint32      max_inflight;
     999              : 
    1000         2338 :         if (prefetcher->streaming_read)
    1001         1266 :             lrq_free(prefetcher->streaming_read);
    1002              : 
    1003         2338 :         if (RecoveryPrefetchEnabled())
    1004              :         {
    1005              :             Assert(maintenance_io_concurrency > 0);
    1006         2338 :             max_inflight = maintenance_io_concurrency;
    1007         2338 :             max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
    1008              :         }
    1009              :         else
    1010              :         {
    1011            0 :             max_inflight = 1;
    1012            0 :             max_distance = 1;
    1013              :         }
    1014              : 
    1015         2338 :         prefetcher->streaming_read = lrq_alloc(max_distance,
    1016              :                                                max_inflight,
    1017              :                                                (uintptr_t) prefetcher,
    1018              :                                                XLogPrefetcherNextBlock);
    1019              : 
    1020         2338 :         prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
    1021              :     }
    1022              : 
    1023              :     /*
    1024              :      * Release last returned record, if there is one, as it's now been
    1025              :      * replayed.
    1026              :      */
    1027      2941283 :     replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
    1028              : 
    1029              :     /*
    1030              :      * Can we drop any filters yet?  If we were waiting for a relation to be
    1031              :      * created or extended, it is now OK to access blocks in the covered
    1032              :      * range.
    1033              :      */
    1034      2941283 :     XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
    1035              : 
    1036              :     /*
    1037              :      * All IO initiated by earlier WAL is now completed.  This might trigger
    1038              :      * further prefetching.
    1039              :      */
    1040      2941283 :     lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
    1041              : 
    1042              :     /*
    1043              :      * If there's nothing queued yet, then start prefetching to cause at least
    1044              :      * one record to be queued.
    1045              :      */
    1046      2941223 :     if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
    1047              :     {
    1048              :         Assert(lrq_inflight(prefetcher->streaming_read) == 0);
    1049              :         Assert(lrq_completed(prefetcher->streaming_read) == 0);
    1050           25 :         lrq_prefetch(prefetcher->streaming_read);
    1051              :     }
    1052              : 
    1053              :     /* Read the next record. */
    1054      2941223 :     record = XLogNextRecord(prefetcher->reader, errmsg);
    1055      2941223 :     if (!record)
    1056          301 :         return NULL;
    1057              : 
    1058              :     /*
    1059              :      * The record we just got is the "current" one, for the benefit of the
    1060              :      * XLogRecXXX() macros.
    1061              :      */
    1062              :     Assert(record == prefetcher->reader->record);
    1063              : 
    1064              :     /*
    1065              :      * If maintenance_io_concurrency is set very low, we might have started
    1066              :      * prefetching some but not all of the blocks referenced in the record
    1067              :      * we're about to return.  Forget about the rest of the blocks in this
    1068              :      * record by dropping the prefetcher's reference to it.
    1069              :      */
    1070      2940922 :     if (record == prefetcher->record)
    1071         2325 :         prefetcher->record = NULL;
    1072              : 
    1073              :     /*
    1074              :      * See if it's time to compute some statistics, because enough WAL has
    1075              :      * been processed.
    1076              :      */
    1077      2940922 :     if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
    1078      1085897 :         XLogPrefetcherComputeStats(prefetcher);
    1079              : 
    1080              :     Assert(record == prefetcher->reader->record);
    1081              : 
    1082      2940922 :     return &record->header;
    1083              : }
    1084              : 
    1085              : bool
    1086         1273 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
    1087              : {
    1088              : #ifndef USE_PREFETCH
    1089              :     if (*new_value == RECOVERY_PREFETCH_ON)
    1090              :     {
    1091              :         GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
    1092              :         return false;
    1093              :     }
    1094              : #endif
    1095              : 
    1096         1273 :     return true;
    1097              : }
    1098              : 
    1099              : void
    1100         1273 : assign_recovery_prefetch(int new_value, void *extra)
    1101              : {
    1102              :     /* Reconfigure prefetching, because a setting it depends on changed. */
    1103         1273 :     recovery_prefetch = new_value;
    1104         1273 :     if (AmStartupProcess())
    1105            0 :         XLogPrefetchReconfigure();
    1106         1273 : }
        

Generated by: LCOV version 2.0-1