Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogprefetcher.c
4 : * Prefetching support for recovery.
5 : *
6 : * Portions Copyright (c) 2022-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/transam/xlogprefetcher.c
12 : *
13 : * This module provides a drop-in replacement for an XLogReader that tries to
14 : * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 : * accessed in the near future are not already in the buffer pool, it initiates
16 : * I/Os that might complete before the caller eventually needs the data. When
17 : * referenced blocks are found in the buffer pool already, the buffer is
18 : * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 : * avoid a second buffer mapping table lookup.
20 : *
21 : * Currently, only the main fork is considered for prefetching. Currently,
22 : * prefetching is only effective on systems where PrefetchBuffer() does
23 : * something useful (mainly Linux).
24 : *
25 : *-------------------------------------------------------------------------
26 : */
27 :
28 : #include "postgres.h"
29 :
30 : #include "access/xlogprefetcher.h"
31 : #include "access/xlogreader.h"
32 : #include "catalog/pg_control.h"
33 : #include "catalog/storage_xlog.h"
34 : #include "commands/dbcommands_xlog.h"
35 : #include "funcapi.h"
36 : #include "miscadmin.h"
37 : #include "port/atomics.h"
38 : #include "storage/bufmgr.h"
39 : #include "storage/shmem.h"
40 : #include "storage/smgr.h"
41 : #include "utils/fmgrprotos.h"
42 : #include "utils/guc_hooks.h"
43 : #include "utils/hsearch.h"
44 : #include "utils/timestamp.h"
45 :
46 : /*
47 : * Every time we process this much WAL, we'll update the values in
48 : * pg_stat_recovery_prefetch.
49 : */
50 : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
51 :
52 : /*
53 : * To detect repeated access to the same block and skip useless extra system
54 : * calls, we remember a small window of recently prefetched blocks.
55 : */
56 : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
57 :
58 : /*
59 : * When maintenance_io_concurrency is not saturated, we're prepared to look
60 : * ahead up to N times that number of block references.
61 : */
62 : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
63 :
64 : /* Define to log internal debugging messages. */
65 : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
66 :
67 : /* GUCs */
68 : int recovery_prefetch = RECOVERY_PREFETCH_TRY;
69 :
70 : #ifdef USE_PREFETCH
71 : #define RecoveryPrefetchEnabled() \
72 : (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73 : maintenance_io_concurrency > 0)
74 : #else
75 : #define RecoveryPrefetchEnabled() false
76 : #endif
77 :
78 : static int XLogPrefetchReconfigureCount = 0;
79 :
80 : /*
81 : * Enum used to report whether an IO should be started.
82 : */
83 : typedef enum
84 : {
85 : LRQ_NEXT_NO_IO,
86 : LRQ_NEXT_IO,
87 : LRQ_NEXT_AGAIN,
88 : } LsnReadQueueNextStatus;
89 :
90 : /*
91 : * Type of callback that can decide which block to prefetch next. For now
92 : * there is only one.
93 : */
94 : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
95 : XLogRecPtr *lsn);
96 :
97 : /*
98 : * A simple circular queue of LSNs, using to control the number of
99 : * (potentially) inflight IOs. This stands in for a later more general IO
100 : * control mechanism, which is why it has the apparently unnecessary
101 : * indirection through a function pointer.
102 : */
103 : typedef struct LsnReadQueue
104 : {
105 : LsnReadQueueNextFun next;
106 : uintptr_t lrq_private;
107 : uint32 max_inflight;
108 : uint32 inflight;
109 : uint32 completed;
110 : uint32 head;
111 : uint32 tail;
112 : uint32 size;
113 : struct
114 : {
115 : bool io;
116 : XLogRecPtr lsn;
117 : } queue[FLEXIBLE_ARRAY_MEMBER];
118 : } LsnReadQueue;
119 :
120 : /*
121 : * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
122 : * blocks that will be soon be referenced, to try to avoid IO stalls.
123 : */
124 : struct XLogPrefetcher
125 : {
126 : /* WAL reader and current reading state. */
127 : XLogReaderState *reader;
128 : DecodedXLogRecord *record;
129 : int next_block_id;
130 :
131 : /* When to publish stats. */
132 : XLogRecPtr next_stats_shm_lsn;
133 :
134 : /* Book-keeping to avoid accessing blocks that don't exist yet. */
135 : HTAB *filter_table;
136 : dlist_head filter_queue;
137 :
138 : /* Book-keeping to avoid repeat prefetches. */
139 : RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
140 : BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
141 : int recent_idx;
142 :
143 : /* Book-keeping to disable prefetching temporarily. */
144 : XLogRecPtr no_readahead_until;
145 :
146 : /* IO depth manager. */
147 : LsnReadQueue *streaming_read;
148 :
149 : XLogRecPtr begin_ptr;
150 :
151 : int reconfigure_count;
152 : };
153 :
154 : /*
155 : * A temporary filter used to track block ranges that haven't been created
156 : * yet, whole relations that haven't been created yet, and whole relations
157 : * that (we assume) have already been dropped, or will be created by bulk WAL
158 : * operators.
159 : */
160 : typedef struct XLogPrefetcherFilter
161 : {
162 : RelFileLocator rlocator;
163 : XLogRecPtr filter_until_replayed;
164 : BlockNumber filter_from_block;
165 : dlist_node link;
166 : } XLogPrefetcherFilter;
167 :
168 : /*
169 : * Counters exposed in shared memory for pg_stat_recovery_prefetch.
170 : */
171 : typedef struct XLogPrefetchStats
172 : {
173 : pg_atomic_uint64 reset_time; /* Time of last reset. */
174 : pg_atomic_uint64 prefetch; /* Prefetches initiated. */
175 : pg_atomic_uint64 hit; /* Blocks already in cache. */
176 : pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
177 : pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
178 : pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
179 : pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
180 :
181 : /* Dynamic values */
182 : int wal_distance; /* Number of WAL bytes ahead. */
183 : int block_distance; /* Number of block references ahead. */
184 : int io_depth; /* Number of I/Os in progress. */
185 : } XLogPrefetchStats;
186 :
187 : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
188 : RelFileLocator rlocator,
189 : BlockNumber blockno,
190 : XLogRecPtr lsn);
191 : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
192 : RelFileLocator rlocator,
193 : BlockNumber blockno);
194 : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
195 : XLogRecPtr replaying_lsn);
196 : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
197 : XLogRecPtr *lsn);
198 :
199 : static XLogPrefetchStats *SharedStats;
200 :
201 : static inline LsnReadQueue *
202 3564 : lrq_alloc(uint32 max_distance,
203 : uint32 max_inflight,
204 : uintptr_t lrq_private,
205 : LsnReadQueueNextFun next)
206 : {
207 : LsnReadQueue *lrq;
208 : uint32 size;
209 :
210 : Assert(max_distance >= max_inflight);
211 :
212 3564 : size = max_distance + 1; /* full ring buffer has a gap */
213 3564 : lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
214 3564 : lrq->lrq_private = lrq_private;
215 3564 : lrq->max_inflight = max_inflight;
216 3564 : lrq->size = size;
217 3564 : lrq->next = next;
218 3564 : lrq->head = 0;
219 3564 : lrq->tail = 0;
220 3564 : lrq->inflight = 0;
221 3564 : lrq->completed = 0;
222 :
223 3564 : return lrq;
224 : }
225 :
226 : static inline void
227 3458 : lrq_free(LsnReadQueue *lrq)
228 : {
229 3458 : pfree(lrq);
230 3458 : }
231 :
232 : static inline uint32
233 1993372 : lrq_inflight(LsnReadQueue *lrq)
234 : {
235 1993372 : return lrq->inflight;
236 : }
237 :
238 : static inline uint32
239 1993372 : lrq_completed(LsnReadQueue *lrq)
240 : {
241 1993372 : return lrq->completed;
242 : }
243 :
244 : static inline void
245 5323440 : lrq_prefetch(LsnReadQueue *lrq)
246 : {
247 : /* Try to start as many IOs as we can within our limits. */
248 10943236 : while (lrq->inflight < lrq->max_inflight &&
249 10938856 : lrq->inflight + lrq->completed < lrq->size - 1)
250 : {
251 : Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
252 5821992 : switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
253 : {
254 202096 : case LRQ_NEXT_AGAIN:
255 202096 : return;
256 12160 : case LRQ_NEXT_IO:
257 12160 : lrq->queue[lrq->head].io = true;
258 12160 : lrq->inflight++;
259 12160 : break;
260 5607636 : case LRQ_NEXT_NO_IO:
261 5607636 : lrq->queue[lrq->head].io = false;
262 5607636 : lrq->completed++;
263 5607636 : break;
264 : }
265 5619796 : lrq->head++;
266 5619796 : if (lrq->head == lrq->size)
267 136962 : lrq->head = 0;
268 : }
269 : }
270 :
271 : static inline void
272 5323406 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
273 : {
274 : /*
275 : * We know that LSNs before 'lsn' have been replayed, so we can now assume
276 : * that any IOs that were started before then have finished.
277 : */
278 10943088 : while (lrq->tail != lrq->head &&
279 10921298 : lrq->queue[lrq->tail].lsn < lsn)
280 : {
281 5619682 : if (lrq->queue[lrq->tail].io)
282 12160 : lrq->inflight--;
283 : else
284 5607522 : lrq->completed--;
285 5619682 : lrq->tail++;
286 5619682 : if (lrq->tail == lrq->size)
287 136960 : lrq->tail = 0;
288 : }
289 5323406 : if (RecoveryPrefetchEnabled())
290 5323406 : lrq_prefetch(lrq);
291 5323306 : }
292 :
293 : size_t
294 3566 : XLogPrefetchShmemSize(void)
295 : {
296 3566 : return sizeof(XLogPrefetchStats);
297 : }
298 :
299 : /*
300 : * Reset all counters to zero.
301 : */
302 : void
303 6 : XLogPrefetchResetStats(void)
304 : {
305 6 : pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
306 6 : pg_atomic_write_u64(&SharedStats->prefetch, 0);
307 6 : pg_atomic_write_u64(&SharedStats->hit, 0);
308 6 : pg_atomic_write_u64(&SharedStats->skip_init, 0);
309 6 : pg_atomic_write_u64(&SharedStats->skip_new, 0);
310 6 : pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
311 6 : pg_atomic_write_u64(&SharedStats->skip_rep, 0);
312 6 : }
313 :
314 : void
315 1918 : XLogPrefetchShmemInit(void)
316 : {
317 : bool found;
318 :
319 1918 : SharedStats = (XLogPrefetchStats *)
320 1918 : ShmemInitStruct("XLogPrefetchStats",
321 : sizeof(XLogPrefetchStats),
322 : &found);
323 :
324 1918 : if (!found)
325 : {
326 1918 : pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
327 1918 : pg_atomic_init_u64(&SharedStats->prefetch, 0);
328 1918 : pg_atomic_init_u64(&SharedStats->hit, 0);
329 1918 : pg_atomic_init_u64(&SharedStats->skip_init, 0);
330 1918 : pg_atomic_init_u64(&SharedStats->skip_new, 0);
331 1918 : pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
332 1918 : pg_atomic_init_u64(&SharedStats->skip_rep, 0);
333 : }
334 1918 : }
335 :
336 : /*
337 : * Called when any GUC is changed that affects prefetching.
338 : */
339 : void
340 22 : XLogPrefetchReconfigure(void)
341 : {
342 22 : XLogPrefetchReconfigureCount++;
343 22 : }
344 :
345 : /*
346 : * Increment a counter in shared memory. This is equivalent to *counter++ on a
347 : * plain uint64 without any memory barrier or locking, except on platforms
348 : * where readers can't read uint64 without possibly observing a torn value.
349 : */
350 : static inline void
351 5601678 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
352 : {
353 : Assert(AmStartupProcess() || !IsUnderPostmaster);
354 5601678 : pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
355 5601678 : }
356 :
357 : /*
358 : * Create a prefetcher that is ready to begin prefetching blocks referenced by
359 : * WAL records.
360 : */
361 : XLogPrefetcher *
362 1650 : XLogPrefetcherAllocate(XLogReaderState *reader)
363 : {
364 : XLogPrefetcher *prefetcher;
365 : HASHCTL ctl;
366 :
367 1650 : prefetcher = palloc0(sizeof(XLogPrefetcher));
368 1650 : prefetcher->reader = reader;
369 :
370 1650 : ctl.keysize = sizeof(RelFileLocator);
371 1650 : ctl.entrysize = sizeof(XLogPrefetcherFilter);
372 1650 : prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
373 : &ctl, HASH_ELEM | HASH_BLOBS);
374 1650 : dlist_init(&prefetcher->filter_queue);
375 :
376 1650 : SharedStats->wal_distance = 0;
377 1650 : SharedStats->block_distance = 0;
378 1650 : SharedStats->io_depth = 0;
379 :
380 : /* First usage will cause streaming_read to be allocated. */
381 1650 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
382 :
383 1650 : return prefetcher;
384 : }
385 :
386 : /*
387 : * Destroy a prefetcher and release all resources.
388 : */
389 : void
390 1544 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
391 : {
392 1544 : lrq_free(prefetcher->streaming_read);
393 1544 : hash_destroy(prefetcher->filter_table);
394 1544 : pfree(prefetcher);
395 1544 : }
396 :
397 : /*
398 : * Provide access to the reader.
399 : */
400 : XLogReaderState *
401 5323226 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
402 : {
403 5323226 : return prefetcher->reader;
404 : }
405 :
406 : /*
407 : * Update the statistics visible in the pg_stat_recovery_prefetch view.
408 : */
409 : void
410 1993372 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
411 : {
412 : uint32 io_depth;
413 : uint32 completed;
414 : int64 wal_distance;
415 :
416 :
417 : /* How far ahead of replay are we now? */
418 1993372 : if (prefetcher->reader->decode_queue_tail)
419 : {
420 1986796 : wal_distance =
421 1986796 : prefetcher->reader->decode_queue_tail->lsn -
422 1986796 : prefetcher->reader->decode_queue_head->lsn;
423 : }
424 : else
425 : {
426 6576 : wal_distance = 0;
427 : }
428 :
429 : /* How many IOs are currently in flight and completed? */
430 1993372 : io_depth = lrq_inflight(prefetcher->streaming_read);
431 1993372 : completed = lrq_completed(prefetcher->streaming_read);
432 :
433 : /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
434 1993372 : SharedStats->io_depth = io_depth;
435 1993372 : SharedStats->block_distance = io_depth + completed;
436 1993372 : SharedStats->wal_distance = wal_distance;
437 :
438 1993372 : prefetcher->next_stats_shm_lsn =
439 1993372 : prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
440 1993372 : }
441 :
442 : /*
443 : * A callback that examines the next block reference in the WAL, and possibly
444 : * starts an IO so that a later read will be fast.
445 : *
446 : * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
447 : *
448 : * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
449 : * that isn't in the buffer pool, and the kernel has been asked to start
450 : * reading it to make a future read system call faster. An LSN is written to
451 : * *lsn, and the I/O will be considered to have completed once that LSN is
452 : * replayed.
453 : *
454 : * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
455 : * that it was already in the buffer pool, or we decided for various reasons
456 : * not to prefetch.
457 : */
458 : static LsnReadQueueNextStatus
459 5821992 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
460 : {
461 5821992 : XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
462 5821992 : XLogReaderState *reader = prefetcher->reader;
463 5821992 : XLogRecPtr replaying_lsn = reader->ReadRecPtr;
464 :
465 : /*
466 : * We keep track of the record and block we're up to between calls with
467 : * prefetcher->record and prefetcher->next_block_id.
468 : */
469 : for (;;)
470 5319382 : {
471 : DecodedXLogRecord *record;
472 :
473 : /* Try to read a new future record, if we don't already have one. */
474 11141374 : if (prefetcher->record == NULL)
475 : {
476 : bool nonblocking;
477 :
478 : /*
479 : * If there are already records or an error queued up that could
480 : * be replayed, we don't want to block here. Otherwise, it's OK
481 : * to block waiting for more data: presumably the caller has
482 : * nothing else to do.
483 : */
484 5521578 : nonblocking = XLogReaderHasQueuedRecordOrError(reader);
485 :
486 : /* Readahead is disabled until we replay past a certain point. */
487 5521578 : if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
488 191466 : return LRQ_NEXT_AGAIN;
489 :
490 5330112 : record = XLogReadAhead(prefetcher->reader, nonblocking);
491 5330012 : if (record == NULL)
492 : {
493 : /*
494 : * We can't read any more, due to an error or lack of data in
495 : * nonblocking mode. Don't try to read ahead again until
496 : * we've replayed everything already decoded.
497 : */
498 7088 : if (nonblocking && prefetcher->reader->decode_queue_tail)
499 6786 : prefetcher->no_readahead_until =
500 6786 : prefetcher->reader->decode_queue_tail->lsn;
501 :
502 7088 : return LRQ_NEXT_AGAIN;
503 : }
504 :
505 : /*
506 : * If prefetching is disabled, we don't need to analyze the record
507 : * or issue any prefetches. We just need to cause one record to
508 : * be decoded.
509 : */
510 5322924 : if (!RecoveryPrefetchEnabled())
511 : {
512 0 : *lsn = InvalidXLogRecPtr;
513 0 : return LRQ_NEXT_NO_IO;
514 : }
515 :
516 : /* We have a new record to process. */
517 5322924 : prefetcher->record = record;
518 5322924 : prefetcher->next_block_id = 0;
519 : }
520 : else
521 : {
522 : /* Continue to process from last call, or last loop. */
523 5619796 : record = prefetcher->record;
524 : }
525 :
526 : /*
527 : * Check for operations that require us to filter out block ranges, or
528 : * pause readahead completely.
529 : */
530 10942720 : if (replaying_lsn < record->lsn)
531 : {
532 10942720 : uint8 rmid = record->header.xl_rmid;
533 10942720 : uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
534 :
535 10942720 : if (rmid == RM_XLOG_ID)
536 : {
537 168990 : if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
538 : record_type == XLOG_END_OF_RECOVERY)
539 : {
540 : /*
541 : * These records might change the TLI. Avoid potential
542 : * bugs if we were to allow "read TLI" and "replay TLI" to
543 : * differ without more analysis.
544 : */
545 2802 : prefetcher->no_readahead_until = record->lsn;
546 :
547 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
548 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
549 : "suppressing all readahead until %X/%X is replayed due to possible TLI change",
550 : LSN_FORMAT_ARGS(record->lsn));
551 : #endif
552 :
553 : /* Fall through so we move past this record. */
554 : }
555 : }
556 10773730 : else if (rmid == RM_DBASE_ID)
557 : {
558 : /*
559 : * When databases are created with the file-copy strategy,
560 : * there are no WAL records to tell us about the creation of
561 : * individual relations.
562 : */
563 78 : if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
564 : {
565 8 : xl_dbase_create_file_copy_rec *xlrec =
566 : (xl_dbase_create_file_copy_rec *) record->main_data;
567 8 : RelFileLocator rlocator =
568 8 : {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
569 :
570 : /*
571 : * Don't try to prefetch anything in this database until
572 : * it has been created, or we might confuse the blocks of
573 : * different generations, if a database OID or
574 : * relfilenumber is reused. It's also more efficient than
575 : * discovering that relations don't exist on disk yet with
576 : * ENOENT errors.
577 : */
578 8 : XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
579 :
580 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
581 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
582 : "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
583 : rlocator.dbOid,
584 : LSN_FORMAT_ARGS(record->lsn));
585 : #endif
586 : }
587 : }
588 10773652 : else if (rmid == RM_SMGR_ID)
589 : {
590 31026 : if (record_type == XLOG_SMGR_CREATE)
591 : {
592 30932 : xl_smgr_create *xlrec = (xl_smgr_create *)
593 : record->main_data;
594 :
595 30932 : if (xlrec->forkNum == MAIN_FORKNUM)
596 : {
597 : /*
598 : * Don't prefetch anything for this whole relation
599 : * until it has been created. Otherwise we might
600 : * confuse the blocks of different generations, if a
601 : * relfilenumber is reused. This also avoids the need
602 : * to discover the problem via extra syscalls that
603 : * report ENOENT.
604 : */
605 27500 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
606 : record->lsn);
607 :
608 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
609 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
610 : "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
611 : xlrec->rlocator.spcOid,
612 : xlrec->rlocator.dbOid,
613 : xlrec->rlocator.relNumber,
614 : LSN_FORMAT_ARGS(record->lsn));
615 : #endif
616 : }
617 : }
618 94 : else if (record_type == XLOG_SMGR_TRUNCATE)
619 : {
620 94 : xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
621 : record->main_data;
622 :
623 : /*
624 : * Don't consider prefetching anything in the truncated
625 : * range until the truncation has been performed.
626 : */
627 94 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
628 : xlrec->blkno,
629 : record->lsn);
630 :
631 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
632 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
633 : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
634 : xlrec->rlocator.spcOid,
635 : xlrec->rlocator.dbOid,
636 : xlrec->rlocator.relNumber,
637 : xlrec->blkno,
638 : LSN_FORMAT_ARGS(record->lsn));
639 : #endif
640 : }
641 : }
642 : }
643 :
644 : /* Scan the block references, starting where we left off last time. */
645 10947342 : while (prefetcher->next_block_id <= record->max_block_id)
646 : {
647 5624418 : int block_id = prefetcher->next_block_id++;
648 5624418 : DecodedBkpBlock *block = &record->blocks[block_id];
649 : SMgrRelation reln;
650 : PrefetchBufferResult result;
651 :
652 5624418 : if (!block->in_use)
653 4180 : continue;
654 :
655 : Assert(!BufferIsValid(block->prefetch_buffer));
656 :
657 : /*
658 : * Record the LSN of this record. When it's replayed,
659 : * LsnReadQueue will consider any IOs submitted for earlier LSNs
660 : * to be finished.
661 : */
662 5620238 : *lsn = record->lsn;
663 :
664 : /* We don't try to prefetch anything but the main fork for now. */
665 5620238 : if (block->forknum != MAIN_FORKNUM)
666 : {
667 5619796 : return LRQ_NEXT_NO_IO;
668 : }
669 :
670 : /*
671 : * If there is a full page image attached, we won't be reading the
672 : * page, so don't bother trying to prefetch.
673 : */
674 5602120 : if (block->has_image)
675 : {
676 4575420 : XLogPrefetchIncrement(&SharedStats->skip_fpw);
677 4575420 : return LRQ_NEXT_NO_IO;
678 : }
679 :
680 : /* There is no point in reading a page that will be zeroed. */
681 1026700 : if (block->flags & BKPBLOCK_WILL_INIT)
682 : {
683 12720 : XLogPrefetchIncrement(&SharedStats->skip_init);
684 12720 : return LRQ_NEXT_NO_IO;
685 : }
686 :
687 : /* Should we skip prefetching this block due to a filter? */
688 1013980 : if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
689 : {
690 90664 : XLogPrefetchIncrement(&SharedStats->skip_new);
691 90664 : return LRQ_NEXT_NO_IO;
692 : }
693 :
694 : /* There is no point in repeatedly prefetching the same block. */
695 2359512 : for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
696 : {
697 2334564 : if (block->blkno == prefetcher->recent_block[i] &&
698 930540 : RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
699 : {
700 : /*
701 : * XXX If we also remembered where it was, we could set
702 : * recent_buffer so that recovery could skip smgropen()
703 : * and a buffer table lookup.
704 : */
705 898368 : XLogPrefetchIncrement(&SharedStats->skip_rep);
706 898368 : return LRQ_NEXT_NO_IO;
707 : }
708 : }
709 24948 : prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
710 24948 : prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
711 24948 : prefetcher->recent_idx =
712 24948 : (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
713 :
714 : /*
715 : * We could try to have a fast path for repeated references to the
716 : * same relation (with some scheme to handle invalidations
717 : * safely), but for now we'll call smgropen() every time.
718 : */
719 24948 : reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
720 :
721 : /*
722 : * If the relation file doesn't exist on disk, for example because
723 : * we're replaying after a crash and the file will be created and
724 : * then unlinked by WAL that hasn't been replayed yet, suppress
725 : * further prefetching in the relation until this record is
726 : * replayed.
727 : */
728 24948 : if (!smgrexists(reln, MAIN_FORKNUM))
729 : {
730 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
731 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
732 : "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
733 : reln->smgr_rlocator.locator.spcOid,
734 : reln->smgr_rlocator.locator.dbOid,
735 : reln->smgr_rlocator.locator.relNumber,
736 : LSN_FORMAT_ARGS(record->lsn));
737 : #endif
738 12 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
739 : record->lsn);
740 12 : XLogPrefetchIncrement(&SharedStats->skip_new);
741 12 : return LRQ_NEXT_NO_IO;
742 : }
743 :
744 : /*
745 : * If the relation isn't big enough to contain the referenced
746 : * block yet, suppress prefetching of this block and higher until
747 : * this record is replayed.
748 : */
749 24936 : if (block->blkno >= smgrnblocks(reln, block->forknum))
750 : {
751 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
752 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
753 : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
754 : reln->smgr_rlocator.locator.spcOid,
755 : reln->smgr_rlocator.locator.dbOid,
756 : reln->smgr_rlocator.locator.relNumber,
757 : block->blkno,
758 : LSN_FORMAT_ARGS(record->lsn));
759 : #endif
760 2934 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
761 : record->lsn);
762 2934 : XLogPrefetchIncrement(&SharedStats->skip_new);
763 2934 : return LRQ_NEXT_NO_IO;
764 : }
765 :
766 : /* Try to initiate prefetching. */
767 22002 : result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
768 22002 : if (BufferIsValid(result.recent_buffer))
769 : {
770 : /* Cache hit, nothing to do. */
771 9400 : XLogPrefetchIncrement(&SharedStats->hit);
772 9400 : block->prefetch_buffer = result.recent_buffer;
773 9400 : return LRQ_NEXT_NO_IO;
774 : }
775 12602 : else if (result.initiated_io)
776 : {
777 : /* Cache miss, I/O (presumably) started. */
778 12160 : XLogPrefetchIncrement(&SharedStats->prefetch);
779 12160 : block->prefetch_buffer = InvalidBuffer;
780 12160 : return LRQ_NEXT_IO;
781 : }
782 442 : else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
783 : {
784 : /*
785 : * This shouldn't be possible, because we already determined
786 : * that the relation exists on disk and is big enough.
787 : * Something is wrong with the cache invalidation for
788 : * smgrexists(), smgrnblocks(), or the file was unlinked or
789 : * truncated beneath our feet?
790 : */
791 0 : elog(ERROR,
792 : "could not prefetch relation %u/%u/%u block %u",
793 : reln->smgr_rlocator.locator.spcOid,
794 : reln->smgr_rlocator.locator.dbOid,
795 : reln->smgr_rlocator.locator.relNumber,
796 : block->blkno);
797 : }
798 : }
799 :
800 : /*
801 : * Several callsites need to be able to read exactly one record
802 : * without any internal readahead. Examples: xlog.c reading
803 : * checkpoint records with emode set to PANIC, which might otherwise
804 : * cause XLogPageRead() to panic on some future page, and xlog.c
805 : * determining where to start writing WAL next, which depends on the
806 : * contents of the reader's internal buffer after reading one record.
807 : * Therefore, don't even think about prefetching until the first
808 : * record after XLogPrefetcherBeginRead() has been consumed.
809 : */
810 5322924 : if (prefetcher->reader->decode_queue_tail &&
811 5322922 : prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
812 3542 : return LRQ_NEXT_AGAIN;
813 :
814 : /* Advance to the next record. */
815 5319382 : prefetcher->record = NULL;
816 : }
817 : pg_unreachable();
818 : }
819 :
820 : /*
821 : * Expose statistics about recovery prefetching.
822 : */
823 : Datum
824 12 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
825 : {
826 : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
827 12 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
828 : Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
829 : bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
830 :
831 12 : InitMaterializedSRF(fcinfo, 0);
832 :
833 132 : for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
834 120 : nulls[i] = false;
835 :
836 12 : values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
837 12 : values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
838 12 : values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
839 12 : values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
840 12 : values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
841 12 : values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
842 12 : values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
843 12 : values[7] = Int32GetDatum(SharedStats->wal_distance);
844 12 : values[8] = Int32GetDatum(SharedStats->block_distance);
845 12 : values[9] = Int32GetDatum(SharedStats->io_depth);
846 12 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
847 :
848 12 : return (Datum) 0;
849 : }
850 :
851 : /*
852 : * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
853 : * has been replayed.
854 : */
855 : static inline void
856 30548 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
857 : BlockNumber blockno, XLogRecPtr lsn)
858 : {
859 : XLogPrefetcherFilter *filter;
860 : bool found;
861 :
862 30548 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
863 30548 : if (!found)
864 : {
865 : /*
866 : * Don't allow any prefetching of this block or higher until replayed.
867 : */
868 30540 : filter->filter_until_replayed = lsn;
869 30540 : filter->filter_from_block = blockno;
870 30540 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
871 : }
872 : else
873 : {
874 : /*
875 : * We were already filtering this rlocator. Extend the filter's
876 : * lifetime to cover this WAL record, but leave the lower of the block
877 : * numbers there because we don't want to have to track individual
878 : * blocks.
879 : */
880 8 : filter->filter_until_replayed = lsn;
881 8 : dlist_delete(&filter->link);
882 8 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
883 8 : filter->filter_from_block = Min(filter->filter_from_block, blockno);
884 : }
885 30548 : }
886 :
887 : /*
888 : * Have we replayed any records that caused us to begin filtering a block
889 : * range? That means that relations should have been created, extended or
890 : * dropped as required, so we can stop filtering out accesses to a given
891 : * relfilenumber.
892 : */
893 : static inline void
894 5323406 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
895 : {
896 5353938 : while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
897 : {
898 734958 : XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
899 : link,
900 : &prefetcher->filter_queue);
901 :
902 734958 : if (filter->filter_until_replayed >= replaying_lsn)
903 704426 : break;
904 :
905 30532 : dlist_delete(&filter->link);
906 30532 : hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
907 : }
908 5323406 : }
909 :
910 : /*
911 : * Check if a given block should be skipped due to a filter.
912 : */
913 : static inline bool
914 1013980 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
915 : BlockNumber blockno)
916 : {
917 : /*
918 : * Test for empty queue first, because we expect it to be empty most of
919 : * the time and we can avoid the hash table lookup in that case.
920 : */
921 1013980 : if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
922 : {
923 : XLogPrefetcherFilter *filter;
924 :
925 : /* See if the block range is filtered. */
926 120724 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
927 120724 : if (filter && filter->filter_from_block <= blockno)
928 : {
929 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
930 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
931 : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
932 : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
933 : LSN_FORMAT_ARGS(filter->filter_until_replayed),
934 : filter->filter_from_block);
935 : #endif
936 90664 : return true;
937 : }
938 :
939 : /* See if the whole database is filtered. */
940 30060 : rlocator.relNumber = InvalidRelFileNumber;
941 30060 : rlocator.spcOid = InvalidOid;
942 30060 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
943 30060 : if (filter)
944 : {
945 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
946 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
947 : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
948 : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
949 : LSN_FORMAT_ARGS(filter->filter_until_replayed));
950 : #endif
951 0 : return true;
952 : }
953 : }
954 :
955 923316 : return false;
956 : }
957 :
958 : /*
959 : * A wrapper for XLogBeginRead() that also resets the prefetcher.
960 : */
961 : void
962 3542 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
963 : {
964 : /* This will forget about any in-flight IO. */
965 3542 : prefetcher->reconfigure_count--;
966 :
967 : /* Book-keeping to avoid readahead on first read. */
968 3542 : prefetcher->begin_ptr = recPtr;
969 :
970 3542 : prefetcher->no_readahead_until = 0;
971 :
972 : /* This will forget about any queued up records in the decoder. */
973 3542 : XLogBeginRead(prefetcher->reader, recPtr);
974 3542 : }
975 :
976 : /*
977 : * A wrapper for XLogReadRecord() that provides the same interface, but also
978 : * tries to initiate I/O for blocks referenced in future WAL records.
979 : */
980 : XLogRecord *
981 5323406 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
982 : {
983 : DecodedXLogRecord *record;
984 : XLogRecPtr replayed_up_to;
985 :
986 : /*
987 : * See if it's time to reset the prefetching machinery, because a relevant
988 : * GUC was changed.
989 : */
990 5323406 : if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
991 : {
992 : uint32 max_distance;
993 : uint32 max_inflight;
994 :
995 3564 : if (prefetcher->streaming_read)
996 1914 : lrq_free(prefetcher->streaming_read);
997 :
998 3564 : if (RecoveryPrefetchEnabled())
999 : {
1000 : Assert(maintenance_io_concurrency > 0);
1001 3564 : max_inflight = maintenance_io_concurrency;
1002 3564 : max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1003 : }
1004 : else
1005 : {
1006 0 : max_inflight = 1;
1007 0 : max_distance = 1;
1008 : }
1009 :
1010 3564 : prefetcher->streaming_read = lrq_alloc(max_distance,
1011 : max_inflight,
1012 : (uintptr_t) prefetcher,
1013 : XLogPrefetcherNextBlock);
1014 :
1015 3564 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1016 : }
1017 :
1018 : /*
1019 : * Release last returned record, if there is one, as it's now been
1020 : * replayed.
1021 : */
1022 5323406 : replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1023 :
1024 : /*
1025 : * Can we drop any filters yet? If we were waiting for a relation to be
1026 : * created or extended, it is now OK to access blocks in the covered
1027 : * range.
1028 : */
1029 5323406 : XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1030 :
1031 : /*
1032 : * All IO initiated by earlier WAL is now completed. This might trigger
1033 : * further prefetching.
1034 : */
1035 5323406 : lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1036 :
1037 : /*
1038 : * If there's nothing queued yet, then start prefetching to cause at least
1039 : * one record to be queued.
1040 : */
1041 5323306 : if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1042 : {
1043 : Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1044 : Assert(lrq_completed(prefetcher->streaming_read) == 0);
1045 34 : lrq_prefetch(prefetcher->streaming_read);
1046 : }
1047 :
1048 : /* Read the next record. */
1049 5323306 : record = XLogNextRecord(prefetcher->reader, errmsg);
1050 5323306 : if (!record)
1051 472 : return NULL;
1052 :
1053 : /*
1054 : * The record we just got is the "current" one, for the benefit of the
1055 : * XLogRecXXX() macros.
1056 : */
1057 : Assert(record == prefetcher->reader->record);
1058 :
1059 : /*
1060 : * If maintenance_io_concurrency is set very low, we might have started
1061 : * prefetching some but not all of the blocks referenced in the record
1062 : * we're about to return. Forget about the rest of the blocks in this
1063 : * record by dropping the prefetcher's reference to it.
1064 : */
1065 5322834 : if (record == prefetcher->record)
1066 3542 : prefetcher->record = NULL;
1067 :
1068 : /*
1069 : * See if it's time to compute some statistics, because enough WAL has
1070 : * been processed.
1071 : */
1072 5322834 : if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1073 1985252 : XLogPrefetcherComputeStats(prefetcher);
1074 :
1075 : Assert(record == prefetcher->reader->record);
1076 :
1077 5322834 : return &record->header;
1078 : }
1079 :
1080 : bool
1081 1982 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1082 : {
1083 : #ifndef USE_PREFETCH
1084 : if (*new_value == RECOVERY_PREFETCH_ON)
1085 : {
1086 : GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1087 : return false;
1088 : }
1089 : #endif
1090 :
1091 1982 : return true;
1092 : }
1093 :
1094 : void
1095 1982 : assign_recovery_prefetch(int new_value, void *extra)
1096 : {
1097 : /* Reconfigure prefetching, because a setting it depends on changed. */
1098 1982 : recovery_prefetch = new_value;
1099 1982 : if (AmStartupProcess())
1100 0 : XLogPrefetchReconfigure();
1101 1982 : }
|