Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * xlogprefetcher.c
4 : : * Prefetching support for recovery.
5 : : *
6 : : * Portions Copyright (c) 2022-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/access/transam/xlogprefetcher.c
12 : : *
13 : : * This module provides a drop-in replacement for an XLogReader that tries to
14 : : * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 : : * accessed in the near future are not already in the buffer pool, it initiates
16 : : * I/Os that might complete before the caller eventually needs the data. When
17 : : * referenced blocks are found in the buffer pool already, the buffer is
18 : : * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 : : * avoid a second buffer mapping table lookup.
20 : : *
21 : : * Currently, only the main fork is considered for prefetching. Currently,
22 : : * prefetching is only effective on systems where PrefetchBuffer() does
23 : : * something useful (mainly Linux).
24 : : *
25 : : *-------------------------------------------------------------------------
26 : : */
27 : :
28 : : #include "postgres.h"
29 : :
30 : : #include "access/xlogprefetcher.h"
31 : : #include "access/xlogreader.h"
32 : : #include "catalog/pg_control.h"
33 : : #include "catalog/storage_xlog.h"
34 : : #include "commands/dbcommands_xlog.h"
35 : : #include "funcapi.h"
36 : : #include "miscadmin.h"
37 : : #include "port/atomics.h"
38 : : #include "storage/bufmgr.h"
39 : : #include "storage/fd.h"
40 : : #include "storage/shmem.h"
41 : : #include "storage/smgr.h"
42 : : #include "storage/subsystems.h"
43 : : #include "utils/fmgrprotos.h"
44 : : #include "utils/guc_hooks.h"
45 : : #include "utils/hsearch.h"
46 : : #include "utils/timestamp.h"
47 : : #include "utils/tuplestore.h"
48 : :
49 : : /*
50 : : * Every time we process this much WAL, we'll update the values in
51 : : * pg_stat_recovery_prefetch.
52 : : */
53 : : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
54 : :
55 : : /*
56 : : * To detect repeated access to the same block and skip useless extra system
57 : : * calls, we remember a small window of recently prefetched blocks.
58 : : */
59 : : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
60 : :
61 : : /*
62 : : * When maintenance_io_concurrency is not saturated, we're prepared to look
63 : : * ahead up to N times that number of block references.
64 : : */
65 : : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
66 : :
67 : : /* Define to log internal debugging messages. */
68 : : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
69 : :
70 : : /* GUCs */
71 : : int recovery_prefetch = RECOVERY_PREFETCH_TRY;
72 : :
73 : : #ifdef USE_PREFETCH
74 : : #define RecoveryPrefetchEnabled() \
75 : : (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
76 : : maintenance_io_concurrency > 0)
77 : : #else
78 : : #define RecoveryPrefetchEnabled() false
79 : : #endif
80 : :
81 : : static int XLogPrefetchReconfigureCount = 0;
82 : :
83 : : /*
84 : : * Enum used to report whether an IO should be started.
85 : : */
86 : : typedef enum
87 : : {
88 : : LRQ_NEXT_NO_IO,
89 : : LRQ_NEXT_IO,
90 : : LRQ_NEXT_AGAIN,
91 : : } LsnReadQueueNextStatus;
92 : :
93 : : /*
94 : : * Type of callback that can decide which block to prefetch next. For now
95 : : * there is only one.
96 : : */
97 : : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
98 : : XLogRecPtr *lsn);
99 : :
100 : : /*
101 : : * A simple circular queue of LSNs, using to control the number of
102 : : * (potentially) inflight IOs. This stands in for a later more general IO
103 : : * control mechanism, which is why it has the apparently unnecessary
104 : : * indirection through a function pointer.
105 : : */
106 : : typedef struct LsnReadQueue
107 : : {
108 : : LsnReadQueueNextFun next;
109 : : uintptr_t lrq_private;
110 : : uint32 max_inflight;
111 : : uint32 inflight;
112 : : uint32 completed;
113 : : uint32 head;
114 : : uint32 tail;
115 : : uint32 size;
116 : : struct
117 : : {
118 : : bool io;
119 : : XLogRecPtr lsn;
120 : : } queue[FLEXIBLE_ARRAY_MEMBER];
121 : : } LsnReadQueue;
122 : :
123 : : /*
124 : : * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
125 : : * blocks that will be soon be referenced, to try to avoid IO stalls.
126 : : */
127 : : struct XLogPrefetcher
128 : : {
129 : : /* WAL reader and current reading state. */
130 : : XLogReaderState *reader;
131 : : DecodedXLogRecord *record;
132 : : int next_block_id;
133 : :
134 : : /* When to publish stats. */
135 : : XLogRecPtr next_stats_shm_lsn;
136 : :
137 : : /* Book-keeping to avoid accessing blocks that don't exist yet. */
138 : : HTAB *filter_table;
139 : : dlist_head filter_queue;
140 : :
141 : : /* Book-keeping to avoid repeat prefetches. */
142 : : RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
143 : : BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
144 : : int recent_idx;
145 : :
146 : : /* Book-keeping to disable prefetching temporarily. */
147 : : XLogRecPtr no_readahead_until;
148 : :
149 : : /* IO depth manager. */
150 : : LsnReadQueue *streaming_read;
151 : :
152 : : XLogRecPtr begin_ptr;
153 : :
154 : : int reconfigure_count;
155 : : };
156 : :
157 : : /*
158 : : * A temporary filter used to track block ranges that haven't been created
159 : : * yet, whole relations that haven't been created yet, and whole relations
160 : : * that (we assume) have already been dropped, or will be created by bulk WAL
161 : : * operators.
162 : : */
163 : : typedef struct XLogPrefetcherFilter
164 : : {
165 : : RelFileLocator rlocator;
166 : : XLogRecPtr filter_until_replayed;
167 : : BlockNumber filter_from_block;
168 : : dlist_node link;
169 : : } XLogPrefetcherFilter;
170 : :
171 : : /*
172 : : * Counters exposed in shared memory for pg_stat_recovery_prefetch.
173 : : */
174 : : typedef struct XLogPrefetchStats
175 : : {
176 : : pg_atomic_uint64 reset_time; /* Time of last reset. */
177 : : pg_atomic_uint64 prefetch; /* Prefetches initiated. */
178 : : pg_atomic_uint64 hit; /* Blocks already in cache. */
179 : : pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
180 : : pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
181 : : pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
182 : : pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
183 : :
184 : : /* Dynamic values */
185 : : int wal_distance; /* Number of WAL bytes ahead. */
186 : : int block_distance; /* Number of block references ahead. */
187 : : int io_depth; /* Number of I/Os in progress. */
188 : : } XLogPrefetchStats;
189 : :
190 : : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
191 : : RelFileLocator rlocator,
192 : : BlockNumber blockno,
193 : : XLogRecPtr lsn);
194 : : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
195 : : RelFileLocator rlocator,
196 : : BlockNumber blockno);
197 : : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
198 : : XLogRecPtr replaying_lsn);
199 : : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
200 : : XLogRecPtr *lsn);
201 : :
202 : : static XLogPrefetchStats *SharedStats;
203 : :
204 : : static void XLogPrefetchShmemRequest(void *arg);
205 : : static void XLogPrefetchShmemInit(void *arg);
206 : :
207 : : const ShmemCallbacks XLogPrefetchShmemCallbacks = {
208 : : .request_fn = XLogPrefetchShmemRequest,
209 : : .init_fn = XLogPrefetchShmemInit,
210 : : };
211 : :
212 : : static inline LsnReadQueue *
1545 tmunro@postgresql.or 213 :CBC 2316 : lrq_alloc(uint32 max_distance,
214 : : uint32 max_inflight,
215 : : uintptr_t lrq_private,
216 : : LsnReadQueueNextFun next)
217 : : {
218 : : LsnReadQueue *lrq;
219 : : uint32 size;
220 : :
221 [ - + ]: 2316 : Assert(max_distance >= max_inflight);
222 : :
223 : 2316 : size = max_distance + 1; /* full ring buffer has a gap */
224 : 2316 : lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
225 : 2316 : lrq->lrq_private = lrq_private;
226 : 2316 : lrq->max_inflight = max_inflight;
227 : 2316 : lrq->size = size;
228 : 2316 : lrq->next = next;
229 : 2316 : lrq->head = 0;
230 : 2316 : lrq->tail = 0;
231 : 2316 : lrq->inflight = 0;
232 : 2316 : lrq->completed = 0;
233 : :
234 : 2316 : return lrq;
235 : : }
236 : :
237 : : static inline void
238 : 2246 : lrq_free(LsnReadQueue *lrq)
239 : : {
240 : 2246 : pfree(lrq);
241 : 2246 : }
242 : :
243 : : static inline uint32
244 : 1112840 : lrq_inflight(LsnReadQueue *lrq)
245 : : {
246 : 1112840 : return lrq->inflight;
247 : : }
248 : :
249 : : static inline uint32
250 : 1112840 : lrq_completed(LsnReadQueue *lrq)
251 : : {
252 : 1112840 : return lrq->completed;
253 : : }
254 : :
255 : : static inline void
256 : 2967552 : lrq_prefetch(LsnReadQueue *lrq)
257 : : {
258 : : /* Try to start as many IOs as we can within our limits. */
259 [ + + ]: 8984848 : while (lrq->inflight < lrq->max_inflight &&
260 [ + + ]: 6012834 : lrq->inflight + lrq->completed < lrq->size - 1)
261 : : {
262 [ - + ]: 3810497 : Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
263 [ + + + - ]: 3810497 : switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
264 : : {
265 : 760690 : case LRQ_NEXT_AGAIN:
266 : 760690 : return;
267 : 7216 : case LRQ_NEXT_IO:
268 : 7216 : lrq->queue[lrq->head].io = true;
269 : 7216 : lrq->inflight++;
270 : 7216 : break;
271 : 3042528 : case LRQ_NEXT_NO_IO:
272 : 3042528 : lrq->queue[lrq->head].io = false;
273 : 3042528 : lrq->completed++;
274 : 3042528 : break;
275 : : }
276 : 3049744 : lrq->head++;
277 [ + + ]: 3049744 : if (lrq->head == lrq->size)
278 : 46849 : lrq->head = 0;
279 : : }
280 : : }
281 : :
282 : : static inline void
283 : 2967527 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
284 : : {
285 : : /*
286 : : * We know that LSNs before 'lsn' have been replayed, so we can now assume
287 : : * that any IOs that were started before then have finished.
288 : : */
289 [ + + ]: 8984724 : while (lrq->tail != lrq->head &&
290 [ + + ]: 5917698 : lrq->queue[lrq->tail].lsn < lsn)
291 : : {
292 [ + + ]: 3049670 : if (lrq->queue[lrq->tail].io)
293 : 7216 : lrq->inflight--;
294 : : else
295 : 3042454 : lrq->completed--;
296 : 3049670 : lrq->tail++;
297 [ + + ]: 3049670 : if (lrq->tail == lrq->size)
298 : 46848 : lrq->tail = 0;
299 : : }
300 [ + - + - ]: 2967527 : if (RecoveryPrefetchEnabled())
301 : 2967527 : lrq_prefetch(lrq);
302 : 2967464 : }
303 : :
304 : : static void
85 heikki.linnakangas@i 305 :GNC 1212 : XLogPrefetchShmemRequest(void *arg)
306 : : {
307 : 1212 : ShmemRequestStruct(.name = "XLogPrefetchStats",
308 : : .size = sizeof(XLogPrefetchStats),
309 : : .ptr = (void **) &SharedStats,
310 : : );
311 : 1212 : }
312 : :
313 : : static void
314 : 1209 : XLogPrefetchShmemInit(void *arg)
315 : : {
316 : 1209 : pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
317 : 1209 : pg_atomic_init_u64(&SharedStats->prefetch, 0);
318 : 1209 : pg_atomic_init_u64(&SharedStats->hit, 0);
319 : 1209 : pg_atomic_init_u64(&SharedStats->skip_init, 0);
320 : 1209 : pg_atomic_init_u64(&SharedStats->skip_new, 0);
321 : 1209 : pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
322 : 1209 : pg_atomic_init_u64(&SharedStats->skip_rep, 0);
1545 tmunro@postgresql.or 323 :GIC 1209 : }
324 : :
325 : : /*
326 : : * Reset all counters to zero.
327 : : */
328 : : void
1545 tmunro@postgresql.or 329 :CBC 4 : XLogPrefetchResetStats(void)
330 : : {
331 : 4 : pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
332 : 4 : pg_atomic_write_u64(&SharedStats->prefetch, 0);
333 : 4 : pg_atomic_write_u64(&SharedStats->hit, 0);
334 : 4 : pg_atomic_write_u64(&SharedStats->skip_init, 0);
335 : 4 : pg_atomic_write_u64(&SharedStats->skip_new, 0);
336 : 4 : pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
337 : 4 : pg_atomic_write_u64(&SharedStats->skip_rep, 0);
338 : 4 : }
339 : :
340 : :
341 : : /*
342 : : * Called when any GUC is changed that affects prefetching.
343 : : */
344 : : void
345 : 14 : XLogPrefetchReconfigure(void)
346 : : {
347 : 14 : XLogPrefetchReconfigureCount++;
348 : 14 : }
349 : :
350 : : /*
351 : : * Increment a counter in shared memory. This is equivalent to *counter++ on a
352 : : * plain uint64 without any memory barrier or locking, except on platforms
353 : : * where readers can't read uint64 without possibly observing a torn value.
354 : : */
355 : : static inline void
356 : 3032346 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
357 : : {
358 [ + + - + ]: 3032346 : Assert(AmStartupProcess() || !IsUnderPostmaster);
359 : 3032346 : pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
360 : 3032346 : }
361 : :
362 : : /*
363 : : * Create a prefetcher that is ready to begin prefetching blocks referenced by
364 : : * WAL records.
365 : : */
366 : : XLogPrefetcher *
367 : 1054 : XLogPrefetcherAllocate(XLogReaderState *reader)
368 : : {
369 : : XLogPrefetcher *prefetcher;
370 : : HASHCTL ctl;
371 : :
202 michael@paquier.xyz 372 :GNC 1054 : prefetcher = palloc0_object(XLogPrefetcher);
1545 tmunro@postgresql.or 373 :CBC 1054 : prefetcher->reader = reader;
374 : :
688 heikki.linnakangas@i 375 : 1054 : ctl.keysize = sizeof(RelFileLocator);
376 : 1054 : ctl.entrysize = sizeof(XLogPrefetcherFilter);
1545 tmunro@postgresql.or 377 : 1054 : prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
378 : : &ctl, HASH_ELEM | HASH_BLOBS);
379 : 1054 : dlist_init(&prefetcher->filter_queue);
380 : :
381 : 1054 : SharedStats->wal_distance = 0;
382 : 1054 : SharedStats->block_distance = 0;
383 : 1054 : SharedStats->io_depth = 0;
384 : :
385 : : /* First usage will cause streaming_read to be allocated. */
386 : 1054 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
387 : :
388 : 1054 : return prefetcher;
389 : : }
390 : :
391 : : /*
392 : : * Destroy a prefetcher and release all resources.
393 : : */
394 : : void
395 : 984 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
396 : : {
397 : 984 : lrq_free(prefetcher->streaming_read);
398 : 984 : hash_destroy(prefetcher->filter_table);
399 : 984 : pfree(prefetcher);
400 : 984 : }
401 : :
402 : : /*
403 : : * Provide access to the reader.
404 : : */
405 : : XLogReaderState *
406 : 2967372 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
407 : : {
408 : 2967372 : return prefetcher->reader;
409 : : }
410 : :
411 : : /*
412 : : * Update the statistics visible in the pg_stat_recovery_prefetch view.
413 : : */
414 : : void
415 : 1112815 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
416 : : {
417 : : uint32 io_depth;
418 : : uint32 completed;
419 : : int64 wal_distance;
420 : :
421 : :
422 : : /* How far ahead of replay are we now? */
423 [ + + ]: 1112815 : if (prefetcher->reader->decode_queue_tail)
424 : : {
425 : 1098412 : wal_distance =
426 : 1098412 : prefetcher->reader->decode_queue_tail->lsn -
427 : 1098412 : prefetcher->reader->decode_queue_head->lsn;
428 : : }
429 : : else
430 : : {
431 : 14403 : wal_distance = 0;
432 : : }
433 : :
434 : : /* How many IOs are currently in flight and completed? */
435 : 1112815 : io_depth = lrq_inflight(prefetcher->streaming_read);
436 : 1112815 : completed = lrq_completed(prefetcher->streaming_read);
437 : :
438 : : /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
439 : 1112815 : SharedStats->io_depth = io_depth;
440 : 1112815 : SharedStats->block_distance = io_depth + completed;
441 : 1112815 : SharedStats->wal_distance = wal_distance;
442 : :
443 : 1112815 : prefetcher->next_stats_shm_lsn =
444 : 1112815 : prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
445 : 1112815 : }
446 : :
447 : : /*
448 : : * A callback that examines the next block reference in the WAL, and possibly
449 : : * starts an IO so that a later read will be fast.
450 : : *
451 : : * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
452 : : *
453 : : * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
454 : : * that isn't in the buffer pool, and the kernel has been asked to start
455 : : * reading it to make a future read system call faster. An LSN is written to
456 : : * *lsn, and the I/O will be considered to have completed once that LSN is
457 : : * replayed.
458 : : *
459 : : * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
460 : : * that it was already in the buffer pool, or we decided for various reasons
461 : : * not to prefetch.
462 : : */
463 : : static LsnReadQueueNextStatus
464 : 3810497 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
465 : : {
466 : 3810497 : XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
467 : 3810497 : XLogReaderState *reader = prefetcher->reader;
468 : 3810497 : XLogRecPtr replaying_lsn = reader->ReadRecPtr;
469 : :
470 : : /*
471 : : * We keep track of the record and block we're up to between calls with
472 : : * prefetcher->record and prefetcher->next_block_id.
473 : : */
474 : : for (;;)
475 : 2964918 : {
476 : : DecodedXLogRecord *record;
477 : :
478 : : /* Try to read a new future record, if we don't already have one. */
479 [ + + ]: 6775415 : if (prefetcher->record == NULL)
480 : : {
481 : : bool nonblocking;
482 : :
483 : : /*
484 : : * If there are already records or an error queued up that could
485 : : * be replayed, we don't want to block here. Otherwise, it's OK
486 : : * to block waiting for more data: presumably the caller has
487 : : * nothing else to do.
488 : : */
489 : 3725671 : nonblocking = XLogReaderHasQueuedRecordOrError(reader);
490 : :
491 : : /* Readahead is disabled until we replay past a certain point. */
1535 492 [ + + + + ]: 3725671 : if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
1545 493 : 739955 : return LRQ_NEXT_AGAIN;
494 : :
495 : 2985716 : record = XLogReadAhead(prefetcher->reader, nonblocking);
496 [ + + ]: 2985653 : if (record == NULL)
497 : : {
498 : : /*
499 : : * We can't read any more, due to an error or lack of data in
500 : : * nonblocking mode. Don't try to read ahead again until
501 : : * we've replayed everything already decoded.
502 : : */
1535 503 [ + + + - ]: 18435 : if (nonblocking && prefetcher->reader->decode_queue_tail)
504 : 18203 : prefetcher->no_readahead_until =
505 : 18203 : prefetcher->reader->decode_queue_tail->lsn;
506 : :
1545 507 : 18435 : return LRQ_NEXT_AGAIN;
508 : : }
509 : :
510 : : /*
511 : : * If prefetching is disabled, we don't need to analyze the record
512 : : * or issue any prefetches. We just need to cause one record to
513 : : * be decoded.
514 : : */
515 [ + - - + ]: 2967218 : if (!RecoveryPrefetchEnabled())
516 : : {
1545 tmunro@postgresql.or 517 :UBC 0 : *lsn = InvalidXLogRecPtr;
518 : 0 : return LRQ_NEXT_NO_IO;
519 : : }
520 : :
521 : : /* We have a new record to process. */
1545 tmunro@postgresql.or 522 :CBC 2967218 : prefetcher->record = record;
523 : 2967218 : prefetcher->next_block_id = 0;
524 : : }
525 : : else
526 : : {
527 : : /* Continue to process from last call, or last loop. */
528 : 3049744 : record = prefetcher->record;
529 : : }
530 : :
531 : : /*
532 : : * Check for operations that require us to filter out block ranges, or
533 : : * pause readahead completely.
534 : : */
535 [ + - ]: 6016962 : if (replaying_lsn < record->lsn)
536 : : {
537 : 6016962 : uint8 rmid = record->header.xl_rmid;
538 : 6016962 : uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
539 : :
540 [ + + ]: 6016962 : if (rmid == RM_XLOG_ID)
541 : : {
542 [ + + + + ]: 177069 : if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
543 : : record_type == XLOG_END_OF_RECOVERY)
544 : : {
545 : : /*
546 : : * These records might change the TLI. Avoid potential
547 : : * bugs if we were to allow "read TLI" and "replay TLI" to
548 : : * differ without more analysis.
549 : : */
550 : 1815 : prefetcher->no_readahead_until = record->lsn;
551 : :
552 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
553 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
554 : : "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
555 : : LSN_FORMAT_ARGS(record->lsn));
556 : : #endif
557 : :
558 : : /* Fall through so we move past this record. */
559 : : }
560 : : }
561 [ + + ]: 5839893 : else if (rmid == RM_DBASE_ID)
562 : : {
563 : : /*
564 : : * When databases are created with the file-copy strategy,
565 : : * there are no WAL records to tell us about the creation of
566 : : * individual relations.
567 : : */
568 [ + + ]: 44 : if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
569 : : {
570 : 5 : xl_dbase_create_file_copy_rec *xlrec =
571 : : (xl_dbase_create_file_copy_rec *) record->main_data;
1455 rhaas@postgresql.org 572 : 5 : RelFileLocator rlocator =
573 : 5 : {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
574 : :
575 : : /*
576 : : * Don't try to prefetch anything in this database until
577 : : * it has been created, or we might confuse the blocks of
578 : : * different generations, if a database OID or
579 : : * relfilenumber is reused. It's also more efficient than
580 : : * discovering that relations don't exist on disk yet with
581 : : * ENOENT errors.
582 : : */
583 : 5 : XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
584 : :
585 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
586 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
587 : : "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
588 : : rlocator.dbOid,
589 : : LSN_FORMAT_ARGS(record->lsn));
590 : : #endif
591 : : }
592 : : }
1545 tmunro@postgresql.or 593 [ + + ]: 5839849 : else if (rmid == RM_SMGR_ID)
594 : : {
595 [ + + ]: 18262 : if (record_type == XLOG_SMGR_CREATE)
596 : : {
597 : 18204 : xl_smgr_create *xlrec = (xl_smgr_create *)
598 : : record->main_data;
599 : :
600 [ + + ]: 18204 : if (xlrec->forkNum == MAIN_FORKNUM)
601 : : {
602 : : /*
603 : : * Don't prefetch anything for this whole relation
604 : : * until it has been created. Otherwise we might
605 : : * confuse the blocks of different generations, if a
606 : : * relfilenumber is reused. This also avoids the need
607 : : * to discover the problem via extra syscalls that
608 : : * report ENOENT.
609 : : */
1455 rhaas@postgresql.org 610 : 16328 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
611 : : record->lsn);
612 : :
613 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
614 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
615 : : "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
616 : : xlrec->rlocator.spcOid,
617 : : xlrec->rlocator.dbOid,
618 : : xlrec->rlocator.relNumber,
619 : : LSN_FORMAT_ARGS(record->lsn));
620 : : #endif
621 : : }
622 : : }
1545 tmunro@postgresql.or 623 [ + - ]: 58 : else if (record_type == XLOG_SMGR_TRUNCATE)
624 : : {
625 : 58 : xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
626 : : record->main_data;
627 : :
628 : : /*
629 : : * Don't consider prefetching anything in the truncated
630 : : * range until the truncation has been performed.
631 : : */
1455 rhaas@postgresql.org 632 : 58 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
633 : : xlrec->blkno,
634 : : record->lsn);
635 : :
636 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
637 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
638 : : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
639 : : xlrec->rlocator.spcOid,
640 : : xlrec->rlocator.dbOid,
641 : : xlrec->rlocator.relNumber,
642 : : xlrec->blkno,
643 : : LSN_FORMAT_ARGS(record->lsn));
644 : : #endif
645 : : }
646 : : }
647 : : }
648 : :
649 : : /* Scan the block references, starting where we left off last time. */
1545 tmunro@postgresql.or 650 [ + + ]: 6019513 : while (prefetcher->next_block_id <= record->max_block_id)
651 : : {
652 : 3052295 : int block_id = prefetcher->next_block_id++;
653 : 3052295 : DecodedBkpBlock *block = &record->blocks[block_id];
654 : : SMgrRelation reln;
655 : : PrefetchBufferResult result;
656 : :
657 [ + + ]: 3052295 : if (!block->in_use)
658 : 2296 : continue;
659 : :
1407 john.naylor@postgres 660 [ - + ]: 3049999 : Assert(!BufferIsValid(block->prefetch_buffer));
661 : :
662 : : /*
663 : : * Record the LSN of this record. When it's replayed,
664 : : * LsnReadQueue will consider any IOs submitted for earlier LSNs
665 : : * to be finished.
666 : : */
1545 tmunro@postgresql.or 667 : 3049999 : *lsn = record->lsn;
668 : :
669 : : /* We don't try to prefetch anything but the main fork for now. */
670 [ + + ]: 3049999 : if (block->forknum != MAIN_FORKNUM)
671 : : {
672 : 3049744 : return LRQ_NEXT_NO_IO;
673 : : }
674 : :
675 : : /*
676 : : * If there is a full page image attached, we won't be reading the
677 : : * page, so don't bother trying to prefetch.
678 : : */
679 [ + + ]: 3032601 : if (block->has_image)
680 : : {
681 : 2492471 : XLogPrefetchIncrement(&SharedStats->skip_fpw);
682 : 2492471 : return LRQ_NEXT_NO_IO;
683 : : }
684 : :
685 : : /* There is no point in reading a page that will be zeroed. */
686 [ + + ]: 540130 : if (block->flags & BKPBLOCK_WILL_INIT)
687 : : {
688 : 6549 : XLogPrefetchIncrement(&SharedStats->skip_init);
689 : 6549 : return LRQ_NEXT_NO_IO;
690 : : }
691 : :
692 : : /* Should we skip prefetching this block due to a filter? */
1455 rhaas@postgresql.org 693 [ + + ]: 533581 : if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
694 : : {
1545 tmunro@postgresql.or 695 : 74177 : XLogPrefetchIncrement(&SharedStats->skip_new);
696 : 74177 : return LRQ_NEXT_NO_IO;
697 : : }
698 : :
699 : : /* There is no point in repeatedly prefetching the same block. */
700 [ + + ]: 1184858 : for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
701 : : {
702 [ + + ]: 1170909 : if (block->blkno == prefetcher->recent_block[i] &&
1455 rhaas@postgresql.org 703 [ + + + + : 463724 : RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
+ - ]
704 : : {
705 : : /*
706 : : * XXX If we also remembered where it was, we could set
707 : : * recent_buffer so that recovery could skip smgropen()
708 : : * and a buffer table lookup.
709 : : */
1545 tmunro@postgresql.or 710 : 445455 : XLogPrefetchIncrement(&SharedStats->skip_rep);
711 : 445455 : return LRQ_NEXT_NO_IO;
712 : : }
713 : : }
1455 rhaas@postgresql.org 714 : 13949 : prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
1545 tmunro@postgresql.or 715 : 13949 : prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
716 : 13949 : prefetcher->recent_idx =
717 : 13949 : (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
718 : :
719 : : /*
720 : : * We could try to have a fast path for repeated references to the
721 : : * same relation (with some scheme to handle invalidations
722 : : * safely), but for now we'll call smgropen() every time.
723 : : */
849 heikki.linnakangas@i 724 : 13949 : reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
725 : :
726 : : /*
727 : : * If the relation file doesn't exist on disk, for example because
728 : : * we're replaying after a crash and the file will be created and
729 : : * then unlinked by WAL that hasn't been replayed yet, suppress
730 : : * further prefetching in the relation until this record is
731 : : * replayed.
732 : : */
1545 tmunro@postgresql.or 733 [ + + ]: 13949 : if (!smgrexists(reln, MAIN_FORKNUM))
734 : : {
735 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
736 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
737 : : "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
738 : : reln->smgr_rlocator.locator.spcOid,
739 : : reln->smgr_rlocator.locator.dbOid,
740 : : reln->smgr_rlocator.locator.relNumber,
741 : : LSN_FORMAT_ARGS(record->lsn));
742 : : #endif
1455 rhaas@postgresql.org 743 : 6 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
744 : : record->lsn);
1545 tmunro@postgresql.or 745 : 6 : XLogPrefetchIncrement(&SharedStats->skip_new);
746 : 6 : return LRQ_NEXT_NO_IO;
747 : : }
748 : :
749 : : /*
750 : : * If the relation isn't big enough to contain the referenced
751 : : * block yet, suppress prefetching of this block and higher until
752 : : * this record is replayed.
753 : : */
754 [ + + ]: 13943 : if (block->blkno >= smgrnblocks(reln, block->forknum))
755 : : {
756 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
757 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
758 : : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
759 : : reln->smgr_rlocator.locator.spcOid,
760 : : reln->smgr_rlocator.locator.dbOid,
761 : : reln->smgr_rlocator.locator.relNumber,
762 : : block->blkno,
763 : : LSN_FORMAT_ARGS(record->lsn));
764 : : #endif
1455 rhaas@postgresql.org 765 : 1481 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
766 : : record->lsn);
1545 tmunro@postgresql.or 767 : 1481 : XLogPrefetchIncrement(&SharedStats->skip_new);
768 : 1481 : return LRQ_NEXT_NO_IO;
769 : : }
770 : :
771 : : /* Try to initiate prefetching. */
772 : 12462 : result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
773 [ + + ]: 12462 : if (BufferIsValid(result.recent_buffer))
774 : : {
775 : : /* Cache hit, nothing to do. */
776 : 4991 : XLogPrefetchIncrement(&SharedStats->hit);
777 : 4991 : block->prefetch_buffer = result.recent_buffer;
778 : 4991 : return LRQ_NEXT_NO_IO;
779 : : }
780 [ + + ]: 7471 : else if (result.initiated_io)
781 : : {
782 : : /* Cache miss, I/O (presumably) started. */
783 : 7216 : XLogPrefetchIncrement(&SharedStats->prefetch);
784 : 7216 : block->prefetch_buffer = InvalidBuffer;
785 : 7216 : return LRQ_NEXT_IO;
786 : : }
1179 787 [ - + ]: 255 : else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
788 : : {
789 : : /*
790 : : * This shouldn't be possible, because we already determined
791 : : * that the relation exists on disk and is big enough.
792 : : * Something is wrong with the cache invalidation for
793 : : * smgrexists(), smgrnblocks(), or the file was unlinked or
794 : : * truncated beneath our feet?
795 : : */
1545 tmunro@postgresql.or 796 [ # # ]:UBC 0 : elog(ERROR,
797 : : "could not prefetch relation %u/%u/%u block %u",
798 : : reln->smgr_rlocator.locator.spcOid,
799 : : reln->smgr_rlocator.locator.dbOid,
800 : : reln->smgr_rlocator.locator.relNumber,
801 : : block->blkno);
802 : : }
803 : : }
804 : :
805 : : /*
806 : : * Several callsites need to be able to read exactly one record
807 : : * without any internal readahead. Examples: PerformWalRecovery()
808 : : * trying to read the first record that follows a checkpoint has emode
809 : : * set to PANIC, which might otherwise cause XLogPageRead() to panic
810 : : * on some future page, and FinishWalRecovery() determining where to
811 : : * start writing WAL next, which depends on the contents of the
812 : : * reader's internal buffer after reading one record. Therefore, don't
813 : : * even think about prefetching until the first record after
814 : : * XLogPrefetcherBeginRead() has been consumed.
815 : : */
1545 tmunro@postgresql.or 816 [ + - ]:CBC 2967218 : if (prefetcher->reader->decode_queue_tail &&
817 [ + + ]: 2967218 : prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
818 : 2300 : return LRQ_NEXT_AGAIN;
819 : :
820 : : /* Advance to the next record. */
821 : 2964918 : prefetcher->record = NULL;
822 : : }
823 : : pg_unreachable();
824 : : }
825 : :
826 : : /*
827 : : * Expose statistics about recovery prefetching.
828 : : */
829 : : Datum
830 : 8 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
831 : : {
832 : : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
833 : 8 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
834 : : Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
835 : : bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
836 : :
1351 michael@paquier.xyz 837 : 8 : InitMaterializedSRF(fcinfo, 0);
838 : :
1545 tmunro@postgresql.or 839 [ + + ]: 88 : for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
840 : 80 : nulls[i] = false;
841 : :
842 : 8 : values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
843 : 8 : values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
844 : 8 : values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
845 : 8 : values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
846 : 8 : values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
847 : 8 : values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
848 : 8 : values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
849 : 8 : values[7] = Int32GetDatum(SharedStats->wal_distance);
850 : 8 : values[8] = Int32GetDatum(SharedStats->block_distance);
851 : 8 : values[9] = Int32GetDatum(SharedStats->io_depth);
852 : 8 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
853 : :
854 : 8 : return (Datum) 0;
855 : : }
856 : :
857 : : /*
858 : : * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
859 : : * has been replayed.
860 : : */
861 : : static inline void
1455 rhaas@postgresql.org 862 : 17878 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
863 : : BlockNumber blockno, XLogRecPtr lsn)
864 : : {
865 : : XLogPrefetcherFilter *filter;
866 : : bool found;
867 : :
868 : 17878 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
1545 tmunro@postgresql.or 869 [ + + ]: 17878 : if (!found)
870 : : {
871 : : /*
872 : : * Don't allow any prefetching of this block or higher until replayed.
873 : : */
874 : 17871 : filter->filter_until_replayed = lsn;
875 : 17871 : filter->filter_from_block = blockno;
876 : 17871 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
877 : : }
878 : : else
879 : : {
880 : : /*
881 : : * We were already filtering this rlocator. Extend the filter's
882 : : * lifetime to cover this WAL record, but leave the lower of the block
883 : : * numbers there because we don't want to have to track individual
884 : : * blocks.
885 : : */
886 : 7 : filter->filter_until_replayed = lsn;
887 : 7 : dlist_delete(&filter->link);
888 : 7 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
889 : 7 : filter->filter_from_block = Min(filter->filter_from_block, blockno);
890 : : }
891 : 17878 : }
892 : :
893 : : /*
894 : : * Have we replayed any records that caused us to begin filtering a block
895 : : * range? That means that relations should have been created, extended or
896 : : * dropped as required, so we can stop filtering out accesses to a given
897 : : * relfilenumber.
898 : : */
899 : : static inline void
900 : 2967527 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
901 : : {
902 [ + + ]: 2985397 : while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
903 : : {
904 : 454594 : XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
905 : : link,
906 : : &prefetcher->filter_queue);
907 : :
908 [ + + ]: 454594 : if (filter->filter_until_replayed >= replaying_lsn)
909 : 436724 : break;
910 : :
911 : 17870 : dlist_delete(&filter->link);
912 : 17870 : hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
913 : : }
914 : 2967527 : }
915 : :
916 : : /*
917 : : * Check if a given block should be skipped due to a filter.
918 : : */
919 : : static inline bool
1455 rhaas@postgresql.org 920 : 533581 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
921 : : BlockNumber blockno)
922 : : {
923 : : /*
924 : : * Test for empty queue first, because we expect it to be empty most of
925 : : * the time and we can avoid the hash table lookup in that case.
926 : : */
1545 tmunro@postgresql.or 927 [ + + ]: 533581 : if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
928 : : {
929 : : XLogPrefetcherFilter *filter;
930 : :
931 : : /* See if the block range is filtered. */
1455 rhaas@postgresql.org 932 : 95348 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
1545 tmunro@postgresql.or 933 [ + + + + ]: 95348 : if (filter && filter->filter_from_block <= blockno)
934 : : {
935 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
936 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
937 : : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
938 : : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
939 : : LSN_FORMAT_ARGS(filter->filter_until_replayed),
940 : : filter->filter_from_block);
941 : : #endif
942 : 74177 : return true;
943 : : }
944 : :
945 : : /* See if the whole database is filtered. */
1455 rhaas@postgresql.org 946 : 21171 : rlocator.relNumber = InvalidRelFileNumber;
947 : 21171 : rlocator.spcOid = InvalidOid;
948 : 21171 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
1545 tmunro@postgresql.or 949 [ - + ]: 21171 : if (filter)
950 : : {
951 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
952 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
953 : : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
954 : : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
955 : : LSN_FORMAT_ARGS(filter->filter_until_replayed));
956 : : #endif
1545 tmunro@postgresql.or 957 :UBC 0 : return true;
958 : : }
959 : : }
960 : :
1545 tmunro@postgresql.or 961 :CBC 459404 : return false;
962 : : }
963 : :
964 : : /*
965 : : * A wrapper for XLogBeginRead() that also resets the prefetcher.
966 : : */
967 : : void
968 : 2302 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
969 : : {
970 : : /* This will forget about any in-flight IO. */
971 : 2302 : prefetcher->reconfigure_count--;
972 : :
973 : : /* Book-keeping to avoid readahead on first read. */
974 : 2302 : prefetcher->begin_ptr = recPtr;
975 : :
152 alvherre@kurilemu.de 976 :GNC 2302 : prefetcher->no_readahead_until = InvalidXLogRecPtr;
977 : :
978 : : /* This will forget about any queued up records in the decoder. */
1545 tmunro@postgresql.or 979 :CBC 2302 : XLogBeginRead(prefetcher->reader, recPtr);
980 : 2302 : }
981 : :
982 : : /*
983 : : * A wrapper for XLogReadRecord() that provides the same interface, but also
984 : : * tries to initiate I/O for blocks referenced in future WAL records.
985 : : */
986 : : XLogRecord *
987 : 2967527 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
988 : : {
989 : : DecodedXLogRecord *record;
990 : : XLogRecPtr replayed_up_to;
991 : :
992 : : /*
993 : : * See if it's time to reset the prefetching machinery, because a relevant
994 : : * GUC was changed.
995 : : */
996 [ + + ]: 2967527 : if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
997 : : {
998 : : uint32 max_distance;
999 : : uint32 max_inflight;
1000 : :
1001 [ + + ]: 2316 : if (prefetcher->streaming_read)
1002 : 1262 : lrq_free(prefetcher->streaming_read);
1003 : :
1004 [ + - + - ]: 2316 : if (RecoveryPrefetchEnabled())
1005 : : {
1391 1006 [ - + ]: 2316 : Assert(maintenance_io_concurrency > 0);
1007 : 2316 : max_inflight = maintenance_io_concurrency;
1545 1008 : 2316 : max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1009 : : }
1010 : : else
1011 : : {
1545 tmunro@postgresql.or 1012 :UBC 0 : max_inflight = 1;
1013 : 0 : max_distance = 1;
1014 : : }
1015 : :
1545 tmunro@postgresql.or 1016 :CBC 2316 : prefetcher->streaming_read = lrq_alloc(max_distance,
1017 : : max_inflight,
1018 : : (uintptr_t) prefetcher,
1019 : : XLogPrefetcherNextBlock);
1020 : :
1021 : 2316 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1022 : : }
1023 : :
1024 : : /*
1025 : : * Release last returned record, if there is one, as it's now been
1026 : : * replayed.
1027 : : */
1391 1028 : 2967527 : replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1029 : :
1030 : : /*
1031 : : * Can we drop any filters yet? If we were waiting for a relation to be
1032 : : * created or extended, it is now OK to access blocks in the covered
1033 : : * range.
1034 : : */
1035 : 2967527 : XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1036 : :
1037 : : /*
1038 : : * All IO initiated by earlier WAL is now completed. This might trigger
1039 : : * further prefetching.
1040 : : */
1041 : 2967527 : lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1042 : :
1043 : : /*
1044 : : * If there's nothing queued yet, then start prefetching to cause at least
1045 : : * one record to be queued.
1046 : : */
1545 1047 [ + + ]: 2967464 : if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1048 : : {
1391 1049 [ - + ]: 25 : Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1050 [ - + ]: 25 : Assert(lrq_completed(prefetcher->streaming_read) == 0);
1545 1051 : 25 : lrq_prefetch(prefetcher->streaming_read);
1052 : : }
1053 : :
1054 : : /* Read the next record. */
1055 : 2967464 : record = XLogNextRecord(prefetcher->reader, errmsg);
1056 [ + + ]: 2967464 : if (!record)
1057 : 318 : return NULL;
1058 : :
1059 : : /*
1060 : : * The record we just got is the "current" one, for the benefit of the
1061 : : * XLogRecXXX() macros.
1062 : : */
1063 [ - + ]: 2967146 : Assert(record == prefetcher->reader->record);
1064 : :
1065 : : /*
1066 : : * If maintenance_io_concurrency is set very low, we might have started
1067 : : * prefetching some but not all of the blocks referenced in the record
1068 : : * we're about to return. Forget about the rest of the blocks in this
1069 : : * record by dropping the prefetcher's reference to it.
1070 : : */
1391 1071 [ + + ]: 2967146 : if (record == prefetcher->record)
1072 : 2300 : prefetcher->record = NULL;
1073 : :
1074 : : /*
1075 : : * See if it's time to compute some statistics, because enough WAL has
1076 : : * been processed.
1077 : : */
1545 1078 [ + + ]: 2967146 : if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1079 : 1097428 : XLogPrefetcherComputeStats(prefetcher);
1080 : :
1081 [ - + ]: 2967146 : Assert(record == prefetcher->reader->record);
1082 : :
1083 : 2967146 : return &record->header;
1084 : : }
1085 : :
1086 : : bool
1087 : 1253 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1088 : : {
1089 : : #ifndef USE_PREFETCH
1090 : : if (*new_value == RECOVERY_PREFETCH_ON)
1091 : : {
1092 : : GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1093 : : return false;
1094 : : }
1095 : : #endif
1096 : :
1097 : 1253 : return true;
1098 : : }
1099 : :
1100 : : void
1101 : 1253 : assign_recovery_prefetch(int new_value, void *extra)
1102 : : {
1103 : : /* Reconfigure prefetching, because a setting it depends on changed. */
1104 : 1253 : recovery_prefetch = new_value;
1105 [ - + ]: 1253 : if (AmStartupProcess())
1545 tmunro@postgresql.or 1106 :UBC 0 : XLogPrefetchReconfigure();
1545 tmunro@postgresql.or 1107 :CBC 1253 : }
|