Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogprefetcher.c
4 : * Prefetching support for recovery.
5 : *
6 : * Portions Copyright (c) 2022-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/transam/xlogprefetcher.c
12 : *
13 : * This module provides a drop-in replacement for an XLogReader that tries to
14 : * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 : * accessed in the near future are not already in the buffer pool, it initiates
16 : * I/Os that might complete before the caller eventually needs the data. When
17 : * referenced blocks are found in the buffer pool already, the buffer is
18 : * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 : * avoid a second buffer mapping table lookup.
20 : *
21 : * Currently, only the main fork is considered for prefetching. Currently,
22 : * prefetching is only effective on systems where PrefetchBuffer() does
23 : * something useful (mainly Linux).
24 : *
25 : *-------------------------------------------------------------------------
26 : */
27 :
28 : #include "postgres.h"
29 :
30 : #include "access/xlogprefetcher.h"
31 : #include "access/xlogreader.h"
32 : #include "catalog/pg_control.h"
33 : #include "catalog/storage_xlog.h"
34 : #include "commands/dbcommands_xlog.h"
35 : #include "funcapi.h"
36 : #include "miscadmin.h"
37 : #include "port/atomics.h"
38 : #include "storage/bufmgr.h"
39 : #include "storage/fd.h"
40 : #include "storage/shmem.h"
41 : #include "storage/smgr.h"
42 : #include "storage/subsystems.h"
43 : #include "utils/fmgrprotos.h"
44 : #include "utils/guc_hooks.h"
45 : #include "utils/hsearch.h"
46 : #include "utils/timestamp.h"
47 : #include "utils/tuplestore.h"
48 :
49 : /*
50 : * Every time we process this much WAL, we'll update the values in
51 : * pg_stat_recovery_prefetch.
52 : */
53 : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
54 :
55 : /*
56 : * To detect repeated access to the same block and skip useless extra system
57 : * calls, we remember a small window of recently prefetched blocks.
58 : */
59 : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
60 :
61 : /*
62 : * When maintenance_io_concurrency is not saturated, we're prepared to look
63 : * ahead up to N times that number of block references.
64 : */
65 : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
66 :
67 : /* Define to log internal debugging messages. */
68 : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
69 :
70 : /* GUCs */
71 : int recovery_prefetch = RECOVERY_PREFETCH_TRY;
72 :
73 : #ifdef USE_PREFETCH
74 : #define RecoveryPrefetchEnabled() \
75 : (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
76 : maintenance_io_concurrency > 0)
77 : #else
78 : #define RecoveryPrefetchEnabled() false
79 : #endif
80 :
81 : static int XLogPrefetchReconfigureCount = 0;
82 :
83 : /*
84 : * Enum used to report whether an IO should be started.
85 : */
86 : typedef enum
87 : {
88 : LRQ_NEXT_NO_IO,
89 : LRQ_NEXT_IO,
90 : LRQ_NEXT_AGAIN,
91 : } LsnReadQueueNextStatus;
92 :
93 : /*
94 : * Type of callback that can decide which block to prefetch next. For now
95 : * there is only one.
96 : */
97 : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
98 : XLogRecPtr *lsn);
99 :
100 : /*
101 : * A simple circular queue of LSNs, using to control the number of
102 : * (potentially) inflight IOs. This stands in for a later more general IO
103 : * control mechanism, which is why it has the apparently unnecessary
104 : * indirection through a function pointer.
105 : */
106 : typedef struct LsnReadQueue
107 : {
108 : LsnReadQueueNextFun next;
109 : uintptr_t lrq_private;
110 : uint32 max_inflight;
111 : uint32 inflight;
112 : uint32 completed;
113 : uint32 head;
114 : uint32 tail;
115 : uint32 size;
116 : struct
117 : {
118 : bool io;
119 : XLogRecPtr lsn;
120 : } queue[FLEXIBLE_ARRAY_MEMBER];
121 : } LsnReadQueue;
122 :
123 : /*
124 : * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
125 : * blocks that will be soon be referenced, to try to avoid IO stalls.
126 : */
127 : struct XLogPrefetcher
128 : {
129 : /* WAL reader and current reading state. */
130 : XLogReaderState *reader;
131 : DecodedXLogRecord *record;
132 : int next_block_id;
133 :
134 : /* When to publish stats. */
135 : XLogRecPtr next_stats_shm_lsn;
136 :
137 : /* Book-keeping to avoid accessing blocks that don't exist yet. */
138 : HTAB *filter_table;
139 : dlist_head filter_queue;
140 :
141 : /* Book-keeping to avoid repeat prefetches. */
142 : RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
143 : BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
144 : int recent_idx;
145 :
146 : /* Book-keeping to disable prefetching temporarily. */
147 : XLogRecPtr no_readahead_until;
148 :
149 : /* IO depth manager. */
150 : LsnReadQueue *streaming_read;
151 :
152 : XLogRecPtr begin_ptr;
153 :
154 : int reconfigure_count;
155 : };
156 :
157 : /*
158 : * A temporary filter used to track block ranges that haven't been created
159 : * yet, whole relations that haven't been created yet, and whole relations
160 : * that (we assume) have already been dropped, or will be created by bulk WAL
161 : * operators.
162 : */
163 : typedef struct XLogPrefetcherFilter
164 : {
165 : RelFileLocator rlocator;
166 : XLogRecPtr filter_until_replayed;
167 : BlockNumber filter_from_block;
168 : dlist_node link;
169 : } XLogPrefetcherFilter;
170 :
171 : /*
172 : * Counters exposed in shared memory for pg_stat_recovery_prefetch.
173 : */
174 : typedef struct XLogPrefetchStats
175 : {
176 : pg_atomic_uint64 reset_time; /* Time of last reset. */
177 : pg_atomic_uint64 prefetch; /* Prefetches initiated. */
178 : pg_atomic_uint64 hit; /* Blocks already in cache. */
179 : pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
180 : pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
181 : pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
182 : pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
183 :
184 : /* Dynamic values */
185 : int wal_distance; /* Number of WAL bytes ahead. */
186 : int block_distance; /* Number of block references ahead. */
187 : int io_depth; /* Number of I/Os in progress. */
188 : } XLogPrefetchStats;
189 :
190 : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
191 : RelFileLocator rlocator,
192 : BlockNumber blockno,
193 : XLogRecPtr lsn);
194 : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
195 : RelFileLocator rlocator,
196 : BlockNumber blockno);
197 : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
198 : XLogRecPtr replaying_lsn);
199 : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
200 : XLogRecPtr *lsn);
201 :
202 : static XLogPrefetchStats *SharedStats;
203 :
204 : static void XLogPrefetchShmemRequest(void *arg);
205 : static void XLogPrefetchShmemInit(void *arg);
206 :
207 : const ShmemCallbacks XLogPrefetchShmemCallbacks = {
208 : .request_fn = XLogPrefetchShmemRequest,
209 : .init_fn = XLogPrefetchShmemInit,
210 : };
211 :
212 : static inline LsnReadQueue *
213 2338 : lrq_alloc(uint32 max_distance,
214 : uint32 max_inflight,
215 : uintptr_t lrq_private,
216 : LsnReadQueueNextFun next)
217 : {
218 : LsnReadQueue *lrq;
219 : uint32 size;
220 :
221 : Assert(max_distance >= max_inflight);
222 :
223 2338 : size = max_distance + 1; /* full ring buffer has a gap */
224 2338 : lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
225 2338 : lrq->lrq_private = lrq_private;
226 2338 : lrq->max_inflight = max_inflight;
227 2338 : lrq->size = size;
228 2338 : lrq->next = next;
229 2338 : lrq->head = 0;
230 2338 : lrq->tail = 0;
231 2338 : lrq->inflight = 0;
232 2338 : lrq->completed = 0;
233 :
234 2338 : return lrq;
235 : }
236 :
237 : static inline void
238 2271 : lrq_free(LsnReadQueue *lrq)
239 : {
240 2271 : pfree(lrq);
241 2271 : }
242 :
243 : static inline uint32
244 1102177 : lrq_inflight(LsnReadQueue *lrq)
245 : {
246 1102177 : return lrq->inflight;
247 : }
248 :
249 : static inline uint32
250 1102177 : lrq_completed(LsnReadQueue *lrq)
251 : {
252 1102177 : return lrq->completed;
253 : }
254 :
255 : static inline void
256 2941308 : lrq_prefetch(LsnReadQueue *lrq)
257 : {
258 : /* Try to start as many IOs as we can within our limits. */
259 8906337 : while (lrq->inflight < lrq->max_inflight &&
260 5960679 : lrq->inflight + lrq->completed < lrq->size - 1)
261 : {
262 : Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
263 3830022 : switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
264 : {
265 806241 : case LRQ_NEXT_AGAIN:
266 806241 : return;
267 7124 : case LRQ_NEXT_IO:
268 7124 : lrq->queue[lrq->head].io = true;
269 7124 : lrq->inflight++;
270 7124 : break;
271 3016597 : case LRQ_NEXT_NO_IO:
272 3016597 : lrq->queue[lrq->head].io = false;
273 3016597 : lrq->completed++;
274 3016597 : break;
275 : }
276 3023721 : lrq->head++;
277 3023721 : if (lrq->head == lrq->size)
278 46453 : lrq->head = 0;
279 : }
280 : }
281 :
282 : static inline void
283 2941283 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
284 : {
285 : /*
286 : * We know that LSNs before 'lsn' have been replayed, so we can now assume
287 : * that any IOs that were started before then have finished.
288 : */
289 8906214 : while (lrq->tail != lrq->head &&
290 5864454 : lrq->queue[lrq->tail].lsn < lsn)
291 : {
292 3023648 : if (lrq->queue[lrq->tail].io)
293 7124 : lrq->inflight--;
294 : else
295 3016524 : lrq->completed--;
296 3023648 : lrq->tail++;
297 3023648 : if (lrq->tail == lrq->size)
298 46452 : lrq->tail = 0;
299 : }
300 2941283 : if (RecoveryPrefetchEnabled())
301 2941283 : lrq_prefetch(lrq);
302 2941223 : }
303 :
304 : static void
305 1232 : XLogPrefetchShmemRequest(void *arg)
306 : {
307 1232 : ShmemRequestStruct(.name = "XLogPrefetchStats",
308 : .size = sizeof(XLogPrefetchStats),
309 : .ptr = (void **) &SharedStats,
310 : );
311 1232 : }
312 :
313 : static void
314 1229 : XLogPrefetchShmemInit(void *arg)
315 : {
316 1229 : pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
317 1229 : pg_atomic_init_u64(&SharedStats->prefetch, 0);
318 1229 : pg_atomic_init_u64(&SharedStats->hit, 0);
319 1229 : pg_atomic_init_u64(&SharedStats->skip_init, 0);
320 1229 : pg_atomic_init_u64(&SharedStats->skip_new, 0);
321 1229 : pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
322 1229 : pg_atomic_init_u64(&SharedStats->skip_rep, 0);
323 1229 : }
324 :
325 : /*
326 : * Reset all counters to zero.
327 : */
328 : void
329 4 : XLogPrefetchResetStats(void)
330 : {
331 4 : pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
332 4 : pg_atomic_write_u64(&SharedStats->prefetch, 0);
333 4 : pg_atomic_write_u64(&SharedStats->hit, 0);
334 4 : pg_atomic_write_u64(&SharedStats->skip_init, 0);
335 4 : pg_atomic_write_u64(&SharedStats->skip_new, 0);
336 4 : pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
337 4 : pg_atomic_write_u64(&SharedStats->skip_rep, 0);
338 4 : }
339 :
340 :
341 : /*
342 : * Called when any GUC is changed that affects prefetching.
343 : */
344 : void
345 11 : XLogPrefetchReconfigure(void)
346 : {
347 11 : XLogPrefetchReconfigureCount++;
348 11 : }
349 :
350 : /*
351 : * Increment a counter in shared memory. This is equivalent to *counter++ on a
352 : * plain uint64 without any memory barrier or locking, except on platforms
353 : * where readers can't read uint64 without possibly observing a torn value.
354 : */
355 : static inline void
356 3007404 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
357 : {
358 : Assert(AmStartupProcess() || !IsUnderPostmaster);
359 3007404 : pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
360 3007404 : }
361 :
362 : /*
363 : * Create a prefetcher that is ready to begin prefetching blocks referenced by
364 : * WAL records.
365 : */
366 : XLogPrefetcher *
367 1072 : XLogPrefetcherAllocate(XLogReaderState *reader)
368 : {
369 : XLogPrefetcher *prefetcher;
370 : HASHCTL ctl;
371 :
372 1072 : prefetcher = palloc0_object(XLogPrefetcher);
373 1072 : prefetcher->reader = reader;
374 :
375 1072 : ctl.keysize = sizeof(RelFileLocator);
376 1072 : ctl.entrysize = sizeof(XLogPrefetcherFilter);
377 1072 : prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
378 : &ctl, HASH_ELEM | HASH_BLOBS);
379 1072 : dlist_init(&prefetcher->filter_queue);
380 :
381 1072 : SharedStats->wal_distance = 0;
382 1072 : SharedStats->block_distance = 0;
383 1072 : SharedStats->io_depth = 0;
384 :
385 : /* First usage will cause streaming_read to be allocated. */
386 1072 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
387 :
388 1072 : return prefetcher;
389 : }
390 :
391 : /*
392 : * Destroy a prefetcher and release all resources.
393 : */
394 : void
395 1005 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
396 : {
397 1005 : lrq_free(prefetcher->streaming_read);
398 1005 : hash_destroy(prefetcher->filter_table);
399 1005 : pfree(prefetcher);
400 1005 : }
401 :
402 : /*
403 : * Provide access to the reader.
404 : */
405 : XLogReaderState *
406 2941141 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
407 : {
408 2941141 : return prefetcher->reader;
409 : }
410 :
411 : /*
412 : * Update the statistics visible in the pg_stat_recovery_prefetch view.
413 : */
414 : void
415 1102177 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
416 : {
417 : uint32 io_depth;
418 : uint32 completed;
419 : int64 wal_distance;
420 :
421 :
422 : /* How far ahead of replay are we now? */
423 1102177 : if (prefetcher->reader->decode_queue_tail)
424 : {
425 1086902 : wal_distance =
426 1086902 : prefetcher->reader->decode_queue_tail->lsn -
427 1086902 : prefetcher->reader->decode_queue_head->lsn;
428 : }
429 : else
430 : {
431 15275 : wal_distance = 0;
432 : }
433 :
434 : /* How many IOs are currently in flight and completed? */
435 1102177 : io_depth = lrq_inflight(prefetcher->streaming_read);
436 1102177 : completed = lrq_completed(prefetcher->streaming_read);
437 :
438 : /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
439 1102177 : SharedStats->io_depth = io_depth;
440 1102177 : SharedStats->block_distance = io_depth + completed;
441 1102177 : SharedStats->wal_distance = wal_distance;
442 :
443 1102177 : prefetcher->next_stats_shm_lsn =
444 1102177 : prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
445 1102177 : }
446 :
447 : /*
448 : * A callback that examines the next block reference in the WAL, and possibly
449 : * starts an IO so that a later read will be fast.
450 : *
451 : * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
452 : *
453 : * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
454 : * that isn't in the buffer pool, and the kernel has been asked to start
455 : * reading it to make a future read system call faster. An LSN is written to
456 : * *lsn, and the I/O will be considered to have completed once that LSN is
457 : * replayed.
458 : *
459 : * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
460 : * that it was already in the buffer pool, or we decided for various reasons
461 : * not to prefetch.
462 : */
463 : static LsnReadQueueNextStatus
464 3830022 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
465 : {
466 3830022 : XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
467 3830022 : XLogReaderState *reader = prefetcher->reader;
468 3830022 : XLogRecPtr replaying_lsn = reader->ReadRecPtr;
469 :
470 : /*
471 : * We keep track of the record and block we're up to between calls with
472 : * prefetcher->record and prefetcher->next_block_id.
473 : */
474 : for (;;)
475 2938669 : {
476 : DecodedXLogRecord *record;
477 :
478 : /* Try to read a new future record, if we don't already have one. */
479 6768691 : if (prefetcher->record == NULL)
480 : {
481 : bool nonblocking;
482 :
483 : /*
484 : * If there are already records or an error queued up that could
485 : * be replayed, we don't want to block here. Otherwise, it's OK
486 : * to block waiting for more data: presumably the caller has
487 : * nothing else to do.
488 : */
489 3744970 : nonblocking = XLogReaderHasQueuedRecordOrError(reader);
490 :
491 : /* Readahead is disabled until we replay past a certain point. */
492 3744970 : if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
493 784875 : return LRQ_NEXT_AGAIN;
494 :
495 2960095 : record = XLogReadAhead(prefetcher->reader, nonblocking);
496 2960035 : if (record == NULL)
497 : {
498 : /*
499 : * We can't read any more, due to an error or lack of data in
500 : * nonblocking mode. Don't try to read ahead again until
501 : * we've replayed everything already decoded.
502 : */
503 19041 : if (nonblocking && prefetcher->reader->decode_queue_tail)
504 18825 : prefetcher->no_readahead_until =
505 18825 : prefetcher->reader->decode_queue_tail->lsn;
506 :
507 19041 : return LRQ_NEXT_AGAIN;
508 : }
509 :
510 : /*
511 : * If prefetching is disabled, we don't need to analyze the record
512 : * or issue any prefetches. We just need to cause one record to
513 : * be decoded.
514 : */
515 2940994 : if (!RecoveryPrefetchEnabled())
516 : {
517 0 : *lsn = InvalidXLogRecPtr;
518 0 : return LRQ_NEXT_NO_IO;
519 : }
520 :
521 : /* We have a new record to process. */
522 2940994 : prefetcher->record = record;
523 2940994 : prefetcher->next_block_id = 0;
524 : }
525 : else
526 : {
527 : /* Continue to process from last call, or last loop. */
528 3023721 : record = prefetcher->record;
529 : }
530 :
531 : /*
532 : * Check for operations that require us to filter out block ranges, or
533 : * pause readahead completely.
534 : */
535 5964715 : if (replaying_lsn < record->lsn)
536 : {
537 5964715 : uint8 rmid = record->header.xl_rmid;
538 5964715 : uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
539 :
540 5964715 : if (rmid == RM_XLOG_ID)
541 : {
542 171960 : if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
543 : record_type == XLOG_END_OF_RECOVERY)
544 : {
545 : /*
546 : * These records might change the TLI. Avoid potential
547 : * bugs if we were to allow "read TLI" and "replay TLI" to
548 : * differ without more analysis.
549 : */
550 1858 : prefetcher->no_readahead_until = record->lsn;
551 :
552 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
553 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
554 : "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
555 : LSN_FORMAT_ARGS(record->lsn));
556 : #endif
557 :
558 : /* Fall through so we move past this record. */
559 : }
560 : }
561 5792755 : else if (rmid == RM_DBASE_ID)
562 : {
563 : /*
564 : * When databases are created with the file-copy strategy,
565 : * there are no WAL records to tell us about the creation of
566 : * individual relations.
567 : */
568 46 : if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
569 : {
570 5 : xl_dbase_create_file_copy_rec *xlrec =
571 : (xl_dbase_create_file_copy_rec *) record->main_data;
572 5 : RelFileLocator rlocator =
573 5 : {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
574 :
575 : /*
576 : * Don't try to prefetch anything in this database until
577 : * it has been created, or we might confuse the blocks of
578 : * different generations, if a database OID or
579 : * relfilenumber is reused. It's also more efficient than
580 : * discovering that relations don't exist on disk yet with
581 : * ENOENT errors.
582 : */
583 5 : XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
584 :
585 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
586 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
587 : "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
588 : rlocator.dbOid,
589 : LSN_FORMAT_ARGS(record->lsn));
590 : #endif
591 : }
592 : }
593 5792709 : else if (rmid == RM_SMGR_ID)
594 : {
595 18319 : if (record_type == XLOG_SMGR_CREATE)
596 : {
597 18269 : xl_smgr_create *xlrec = (xl_smgr_create *)
598 : record->main_data;
599 :
600 18269 : if (xlrec->forkNum == MAIN_FORKNUM)
601 : {
602 : /*
603 : * Don't prefetch anything for this whole relation
604 : * until it has been created. Otherwise we might
605 : * confuse the blocks of different generations, if a
606 : * relfilenumber is reused. This also avoids the need
607 : * to discover the problem via extra syscalls that
608 : * report ENOENT.
609 : */
610 16325 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
611 : record->lsn);
612 :
613 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
614 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
615 : "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
616 : xlrec->rlocator.spcOid,
617 : xlrec->rlocator.dbOid,
618 : xlrec->rlocator.relNumber,
619 : LSN_FORMAT_ARGS(record->lsn));
620 : #endif
621 : }
622 : }
623 50 : else if (record_type == XLOG_SMGR_TRUNCATE)
624 : {
625 50 : xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
626 : record->main_data;
627 :
628 : /*
629 : * Don't consider prefetching anything in the truncated
630 : * range until the truncation has been performed.
631 : */
632 50 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
633 : xlrec->blkno,
634 : record->lsn);
635 :
636 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
637 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
638 : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
639 : xlrec->rlocator.spcOid,
640 : xlrec->rlocator.dbOid,
641 : xlrec->rlocator.relNumber,
642 : xlrec->blkno,
643 : LSN_FORMAT_ARGS(record->lsn));
644 : #endif
645 : }
646 : }
647 : }
648 :
649 : /* Scan the block references, starting where we left off last time. */
650 5967195 : while (prefetcher->next_block_id <= record->max_block_id)
651 : {
652 3026201 : int block_id = prefetcher->next_block_id++;
653 3026201 : DecodedBkpBlock *block = &record->blocks[block_id];
654 : SMgrRelation reln;
655 : PrefetchBufferResult result;
656 :
657 3026201 : if (!block->in_use)
658 2227 : continue;
659 :
660 : Assert(!BufferIsValid(block->prefetch_buffer));
661 :
662 : /*
663 : * Record the LSN of this record. When it's replayed,
664 : * LsnReadQueue will consider any IOs submitted for earlier LSNs
665 : * to be finished.
666 : */
667 3023974 : *lsn = record->lsn;
668 :
669 : /* We don't try to prefetch anything but the main fork for now. */
670 3023974 : if (block->forknum != MAIN_FORKNUM)
671 : {
672 3023721 : return LRQ_NEXT_NO_IO;
673 : }
674 :
675 : /*
676 : * If there is a full page image attached, we won't be reading the
677 : * page, so don't bother trying to prefetch.
678 : */
679 3007657 : if (block->has_image)
680 : {
681 2469952 : XLogPrefetchIncrement(&SharedStats->skip_fpw);
682 2469952 : return LRQ_NEXT_NO_IO;
683 : }
684 :
685 : /* There is no point in reading a page that will be zeroed. */
686 537705 : if (block->flags & BKPBLOCK_WILL_INIT)
687 : {
688 6526 : XLogPrefetchIncrement(&SharedStats->skip_init);
689 6526 : return LRQ_NEXT_NO_IO;
690 : }
691 :
692 : /* Should we skip prefetching this block due to a filter? */
693 531179 : if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
694 : {
695 74359 : XLogPrefetchIncrement(&SharedStats->skip_new);
696 74359 : return LRQ_NEXT_NO_IO;
697 : }
698 :
699 : /* There is no point in repeatedly prefetching the same block. */
700 1180261 : for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
701 : {
702 1166648 : if (block->blkno == prefetcher->recent_block[i] &&
703 460979 : RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
704 : {
705 : /*
706 : * XXX If we also remembered where it was, we could set
707 : * recent_buffer so that recovery could skip smgropen()
708 : * and a buffer table lookup.
709 : */
710 443207 : XLogPrefetchIncrement(&SharedStats->skip_rep);
711 443207 : return LRQ_NEXT_NO_IO;
712 : }
713 : }
714 13613 : prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
715 13613 : prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
716 13613 : prefetcher->recent_idx =
717 13613 : (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
718 :
719 : /*
720 : * We could try to have a fast path for repeated references to the
721 : * same relation (with some scheme to handle invalidations
722 : * safely), but for now we'll call smgropen() every time.
723 : */
724 13613 : reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
725 :
726 : /*
727 : * If the relation file doesn't exist on disk, for example because
728 : * we're replaying after a crash and the file will be created and
729 : * then unlinked by WAL that hasn't been replayed yet, suppress
730 : * further prefetching in the relation until this record is
731 : * replayed.
732 : */
733 13613 : if (!smgrexists(reln, MAIN_FORKNUM))
734 : {
735 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
736 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
737 : "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
738 : reln->smgr_rlocator.locator.spcOid,
739 : reln->smgr_rlocator.locator.dbOid,
740 : reln->smgr_rlocator.locator.relNumber,
741 : LSN_FORMAT_ARGS(record->lsn));
742 : #endif
743 6 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
744 : record->lsn);
745 6 : XLogPrefetchIncrement(&SharedStats->skip_new);
746 6 : return LRQ_NEXT_NO_IO;
747 : }
748 :
749 : /*
750 : * If the relation isn't big enough to contain the referenced
751 : * block yet, suppress prefetching of this block and higher until
752 : * this record is replayed.
753 : */
754 13607 : if (block->blkno >= smgrnblocks(reln, block->forknum))
755 : {
756 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
757 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
758 : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
759 : reln->smgr_rlocator.locator.spcOid,
760 : reln->smgr_rlocator.locator.dbOid,
761 : reln->smgr_rlocator.locator.relNumber,
762 : block->blkno,
763 : LSN_FORMAT_ARGS(record->lsn));
764 : #endif
765 1459 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
766 : record->lsn);
767 1459 : XLogPrefetchIncrement(&SharedStats->skip_new);
768 1459 : return LRQ_NEXT_NO_IO;
769 : }
770 :
771 : /* Try to initiate prefetching. */
772 12148 : result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
773 12148 : if (BufferIsValid(result.recent_buffer))
774 : {
775 : /* Cache hit, nothing to do. */
776 4771 : XLogPrefetchIncrement(&SharedStats->hit);
777 4771 : block->prefetch_buffer = result.recent_buffer;
778 4771 : return LRQ_NEXT_NO_IO;
779 : }
780 7377 : else if (result.initiated_io)
781 : {
782 : /* Cache miss, I/O (presumably) started. */
783 7124 : XLogPrefetchIncrement(&SharedStats->prefetch);
784 7124 : block->prefetch_buffer = InvalidBuffer;
785 7124 : return LRQ_NEXT_IO;
786 : }
787 253 : else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
788 : {
789 : /*
790 : * This shouldn't be possible, because we already determined
791 : * that the relation exists on disk and is big enough.
792 : * Something is wrong with the cache invalidation for
793 : * smgrexists(), smgrnblocks(), or the file was unlinked or
794 : * truncated beneath our feet?
795 : */
796 0 : elog(ERROR,
797 : "could not prefetch relation %u/%u/%u block %u",
798 : reln->smgr_rlocator.locator.spcOid,
799 : reln->smgr_rlocator.locator.dbOid,
800 : reln->smgr_rlocator.locator.relNumber,
801 : block->blkno);
802 : }
803 : }
804 :
805 : /*
806 : * Several callsites need to be able to read exactly one record
807 : * without any internal readahead. Examples: xlog.c reading
808 : * checkpoint records with emode set to PANIC, which might otherwise
809 : * cause XLogPageRead() to panic on some future page, and xlog.c
810 : * determining where to start writing WAL next, which depends on the
811 : * contents of the reader's internal buffer after reading one record.
812 : * Therefore, don't even think about prefetching until the first
813 : * record after XLogPrefetcherBeginRead() has been consumed.
814 : */
815 2940994 : if (prefetcher->reader->decode_queue_tail &&
816 2940994 : prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
817 2325 : return LRQ_NEXT_AGAIN;
818 :
819 : /* Advance to the next record. */
820 2938669 : prefetcher->record = NULL;
821 : }
822 : pg_unreachable();
823 : }
824 :
825 : /*
826 : * Expose statistics about recovery prefetching.
827 : */
828 : Datum
829 8 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
830 : {
831 : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
832 8 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
833 : Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
834 : bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
835 :
836 8 : InitMaterializedSRF(fcinfo, 0);
837 :
838 88 : for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
839 80 : nulls[i] = false;
840 :
841 8 : values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
842 8 : values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
843 8 : values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
844 8 : values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
845 8 : values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
846 8 : values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
847 8 : values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
848 8 : values[7] = Int32GetDatum(SharedStats->wal_distance);
849 8 : values[8] = Int32GetDatum(SharedStats->block_distance);
850 8 : values[9] = Int32GetDatum(SharedStats->io_depth);
851 8 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
852 :
853 8 : return (Datum) 0;
854 : }
855 :
856 : /*
857 : * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
858 : * has been replayed.
859 : */
860 : static inline void
861 17845 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
862 : BlockNumber blockno, XLogRecPtr lsn)
863 : {
864 : XLogPrefetcherFilter *filter;
865 : bool found;
866 :
867 17845 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
868 17845 : if (!found)
869 : {
870 : /*
871 : * Don't allow any prefetching of this block or higher until replayed.
872 : */
873 17838 : filter->filter_until_replayed = lsn;
874 17838 : filter->filter_from_block = blockno;
875 17838 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
876 : }
877 : else
878 : {
879 : /*
880 : * We were already filtering this rlocator. Extend the filter's
881 : * lifetime to cover this WAL record, but leave the lower of the block
882 : * numbers there because we don't want to have to track individual
883 : * blocks.
884 : */
885 7 : filter->filter_until_replayed = lsn;
886 7 : dlist_delete(&filter->link);
887 7 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
888 7 : filter->filter_from_block = Min(filter->filter_from_block, blockno);
889 : }
890 17845 : }
891 :
892 : /*
893 : * Have we replayed any records that caused us to begin filtering a block
894 : * range? That means that relations should have been created, extended or
895 : * dropped as required, so we can stop filtering out accesses to a given
896 : * relfilenumber.
897 : */
898 : static inline void
899 2941283 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
900 : {
901 2959120 : while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
902 : {
903 440963 : XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
904 : link,
905 : &prefetcher->filter_queue);
906 :
907 440963 : if (filter->filter_until_replayed >= replaying_lsn)
908 423126 : break;
909 :
910 17837 : dlist_delete(&filter->link);
911 17837 : hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
912 : }
913 2941283 : }
914 :
915 : /*
916 : * Check if a given block should be skipped due to a filter.
917 : */
918 : static inline bool
919 531179 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
920 : BlockNumber blockno)
921 : {
922 : /*
923 : * Test for empty queue first, because we expect it to be empty most of
924 : * the time and we can avoid the hash table lookup in that case.
925 : */
926 531179 : if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
927 : {
928 : XLogPrefetcherFilter *filter;
929 :
930 : /* See if the block range is filtered. */
931 95028 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
932 95028 : if (filter && filter->filter_from_block <= blockno)
933 : {
934 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
935 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
936 : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
937 : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
938 : LSN_FORMAT_ARGS(filter->filter_until_replayed),
939 : filter->filter_from_block);
940 : #endif
941 74359 : return true;
942 : }
943 :
944 : /* See if the whole database is filtered. */
945 20669 : rlocator.relNumber = InvalidRelFileNumber;
946 20669 : rlocator.spcOid = InvalidOid;
947 20669 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
948 20669 : if (filter)
949 : {
950 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
951 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
952 : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
953 : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
954 : LSN_FORMAT_ARGS(filter->filter_until_replayed));
955 : #endif
956 0 : return true;
957 : }
958 : }
959 :
960 456820 : return false;
961 : }
962 :
963 : /*
964 : * A wrapper for XLogBeginRead() that also resets the prefetcher.
965 : */
966 : void
967 2327 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
968 : {
969 : /* This will forget about any in-flight IO. */
970 2327 : prefetcher->reconfigure_count--;
971 :
972 : /* Book-keeping to avoid readahead on first read. */
973 2327 : prefetcher->begin_ptr = recPtr;
974 :
975 2327 : prefetcher->no_readahead_until = InvalidXLogRecPtr;
976 :
977 : /* This will forget about any queued up records in the decoder. */
978 2327 : XLogBeginRead(prefetcher->reader, recPtr);
979 2327 : }
980 :
981 : /*
982 : * A wrapper for XLogReadRecord() that provides the same interface, but also
983 : * tries to initiate I/O for blocks referenced in future WAL records.
984 : */
985 : XLogRecord *
986 2941283 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
987 : {
988 : DecodedXLogRecord *record;
989 : XLogRecPtr replayed_up_to;
990 :
991 : /*
992 : * See if it's time to reset the prefetching machinery, because a relevant
993 : * GUC was changed.
994 : */
995 2941283 : if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
996 : {
997 : uint32 max_distance;
998 : uint32 max_inflight;
999 :
1000 2338 : if (prefetcher->streaming_read)
1001 1266 : lrq_free(prefetcher->streaming_read);
1002 :
1003 2338 : if (RecoveryPrefetchEnabled())
1004 : {
1005 : Assert(maintenance_io_concurrency > 0);
1006 2338 : max_inflight = maintenance_io_concurrency;
1007 2338 : max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1008 : }
1009 : else
1010 : {
1011 0 : max_inflight = 1;
1012 0 : max_distance = 1;
1013 : }
1014 :
1015 2338 : prefetcher->streaming_read = lrq_alloc(max_distance,
1016 : max_inflight,
1017 : (uintptr_t) prefetcher,
1018 : XLogPrefetcherNextBlock);
1019 :
1020 2338 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1021 : }
1022 :
1023 : /*
1024 : * Release last returned record, if there is one, as it's now been
1025 : * replayed.
1026 : */
1027 2941283 : replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1028 :
1029 : /*
1030 : * Can we drop any filters yet? If we were waiting for a relation to be
1031 : * created or extended, it is now OK to access blocks in the covered
1032 : * range.
1033 : */
1034 2941283 : XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1035 :
1036 : /*
1037 : * All IO initiated by earlier WAL is now completed. This might trigger
1038 : * further prefetching.
1039 : */
1040 2941283 : lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1041 :
1042 : /*
1043 : * If there's nothing queued yet, then start prefetching to cause at least
1044 : * one record to be queued.
1045 : */
1046 2941223 : if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1047 : {
1048 : Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1049 : Assert(lrq_completed(prefetcher->streaming_read) == 0);
1050 25 : lrq_prefetch(prefetcher->streaming_read);
1051 : }
1052 :
1053 : /* Read the next record. */
1054 2941223 : record = XLogNextRecord(prefetcher->reader, errmsg);
1055 2941223 : if (!record)
1056 301 : return NULL;
1057 :
1058 : /*
1059 : * The record we just got is the "current" one, for the benefit of the
1060 : * XLogRecXXX() macros.
1061 : */
1062 : Assert(record == prefetcher->reader->record);
1063 :
1064 : /*
1065 : * If maintenance_io_concurrency is set very low, we might have started
1066 : * prefetching some but not all of the blocks referenced in the record
1067 : * we're about to return. Forget about the rest of the blocks in this
1068 : * record by dropping the prefetcher's reference to it.
1069 : */
1070 2940922 : if (record == prefetcher->record)
1071 2325 : prefetcher->record = NULL;
1072 :
1073 : /*
1074 : * See if it's time to compute some statistics, because enough WAL has
1075 : * been processed.
1076 : */
1077 2940922 : if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1078 1085897 : XLogPrefetcherComputeStats(prefetcher);
1079 :
1080 : Assert(record == prefetcher->reader->record);
1081 :
1082 2940922 : return &record->header;
1083 : }
1084 :
1085 : bool
1086 1273 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1087 : {
1088 : #ifndef USE_PREFETCH
1089 : if (*new_value == RECOVERY_PREFETCH_ON)
1090 : {
1091 : GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1092 : return false;
1093 : }
1094 : #endif
1095 :
1096 1273 : return true;
1097 : }
1098 :
1099 : void
1100 1273 : assign_recovery_prefetch(int new_value, void *extra)
1101 : {
1102 : /* Reconfigure prefetching, because a setting it depends on changed. */
1103 1273 : recovery_prefetch = new_value;
1104 1273 : if (AmStartupProcess())
1105 0 : XLogPrefetchReconfigure();
1106 1273 : }
|