Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * read_stream.c
4 : * Mechanism for accessing buffered relation data with look-ahead
5 : *
6 : * Code that needs to access relation data typically pins blocks one at a
7 : * time, often in a predictable order that might be sequential or data-driven.
8 : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : * because blocks that are not yet in the buffer pool require I/O operations
10 : * that are small and might stall waiting for storage. This mechanism looks
11 : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : * distance.
14 : *
15 : * A user-provided callback generates a stream of block numbers that is used
16 : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : * pending read. When that isn't possible, the existing pending read is sent
18 : * to StartReadBuffers() so that a new one can begin to form.
19 : *
20 : * The algorithm for controlling the look-ahead distance is based on recent
21 : * cache / miss history, as well as whether we need to wait for I/O completion
22 : * after a miss. When no I/O is necessary, there is no benefit in looking
23 : * ahead more than one block. This is the default initial assumption. When
24 : * blocks needing I/O are streamed, the combine distance is increased to
25 : * benefit from I/O combining and the read-ahead distance is increased
26 : * whenever we need to wait for I/O to try to benefit from increased I/O
27 : * concurrency. Both are reduced gradually when cached blocks are streamed.
28 : *
29 : * The main data structure is a circular queue of buffers of size
30 : * max_pinned_buffers plus some extra space for technical reasons, ready to be
31 : * returned by read_stream_next_buffer(). Each buffer also has an optional
32 : * variable sized object that is passed from the callback to the consumer of
33 : * buffers.
34 : *
35 : * Parallel to the queue of buffers, there is a circular queue of in-progress
36 : * I/Os that have been started with StartReadBuffers(), and for which
37 : * WaitReadBuffers() must be called before returning the buffer.
38 : *
39 : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
40 : * successive calls, then these data structures might appear as follows:
41 : *
42 : * buffers buf/data ios
43 : *
44 : * +----+ +-----+ +--------+
45 : * | | | | +----+ 42..44 | <- oldest_io_index
46 : * +----+ +-----+ | +--------+
47 : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
48 : * +----+ +-----+ | | +--------+
49 : * | 42 | | ? |<-+ | | | <- next_io_index
50 : * +----+ +-----+ | +--------+
51 : * | 43 | | ? | | | |
52 : * +----+ +-----+ | +--------+
53 : * | 44 | | ? | | | |
54 : * +----+ +-----+ | +--------+
55 : * | 60 | | ? |<---+
56 : * +----+ +-----+
57 : * next_buffer_index -> | | | |
58 : * +----+ +-----+
59 : *
60 : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
61 : * the client is block 10. Block 10 was a hit and has no associated I/O, but
62 : * the range 42..44 requires an I/O wait before its buffers are returned, as
63 : * does block 60.
64 : *
65 : *
66 : * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
67 : * Portions Copyright (c) 1994, Regents of the University of California
68 : *
69 : * IDENTIFICATION
70 : * src/backend/storage/aio/read_stream.c
71 : *
72 : *-------------------------------------------------------------------------
73 : */
74 : #include "postgres.h"
75 :
76 : #include "miscadmin.h"
77 : #include "executor/instrument_node.h"
78 : #include "storage/aio.h"
79 : #include "storage/fd.h"
80 : #include "storage/smgr.h"
81 : #include "storage/read_stream.h"
82 : #include "utils/memdebug.h"
83 : #include "utils/rel.h"
84 : #include "utils/spccache.h"
85 :
86 : typedef struct InProgressIO
87 : {
88 : int16 buffer_index;
89 : ReadBuffersOperation op;
90 : } InProgressIO;
91 :
92 : /*
93 : * State for managing a stream of reads.
94 : */
95 : struct ReadStream
96 : {
97 : int16 max_ios;
98 : int16 io_combine_limit;
99 : int16 ios_in_progress;
100 : int16 queue_size;
101 : int16 max_pinned_buffers;
102 : int16 forwarded_buffers;
103 : int16 pinned_buffers;
104 :
105 : /*
106 : * Limit of how far, in blocks, to look-ahead for IO combining and for
107 : * read-ahead.
108 : *
109 : * The limits for read-ahead and combining are handled separately to allow
110 : * for IO combining even in cases where the I/O subsystem can keep up at a
111 : * low read-ahead distance, as doing larger IOs is more efficient.
112 : *
113 : * Set to 0 when the end of the stream is reached.
114 : */
115 : int16 combine_distance;
116 : int16 readahead_distance;
117 : uint16 distance_decay_holdoff;
118 : int16 initialized_buffers;
119 : int16 resume_readahead_distance;
120 : int16 resume_combine_distance;
121 : int read_buffers_flags;
122 : bool sync_mode; /* using io_method=sync */
123 : bool batch_mode; /* READ_STREAM_USE_BATCHING */
124 : bool advice_enabled;
125 : bool temporary;
126 :
127 : /* scan stats counters */
128 : IOStats *stats;
129 :
130 : /*
131 : * One-block buffer to support 'ungetting' a block number, to resolve flow
132 : * control problems when I/Os are split.
133 : */
134 : BlockNumber buffered_blocknum;
135 :
136 : /*
137 : * The callback that will tell us which block numbers to read, and an
138 : * opaque pointer that will be pass to it for its own purposes.
139 : */
140 : ReadStreamBlockNumberCB callback;
141 : void *callback_private_data;
142 :
143 : /* Next expected block, for detecting sequential access. */
144 : BlockNumber seq_blocknum;
145 : BlockNumber seq_until_processed;
146 :
147 : /* The read operation we are currently preparing. */
148 : BlockNumber pending_read_blocknum;
149 : int16 pending_read_nblocks;
150 :
151 : /* Space for buffers and optional per-buffer private data. */
152 : size_t per_buffer_data_size;
153 : void *per_buffer_data;
154 :
155 : /* Read operations that have been started but not waited for yet. */
156 : InProgressIO *ios;
157 : int16 oldest_io_index;
158 : int16 next_io_index;
159 :
160 : bool fast_path;
161 :
162 : /* Circular queue of buffers. */
163 : int16 oldest_buffer_index; /* Next pinned buffer to return */
164 : int16 next_buffer_index; /* Index of next buffer to pin */
165 : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
166 : };
167 :
168 : /*
169 : * Return a pointer to the per-buffer data by index.
170 : */
171 : static inline void *
172 4396206 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
173 : {
174 8792412 : return (char *) stream->per_buffer_data +
175 4396206 : stream->per_buffer_data_size * buffer_index;
176 : }
177 :
178 : /*
179 : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
180 : * blocks [current_blocknum, last_exclusive).
181 : */
182 : BlockNumber
183 416521 : block_range_read_stream_cb(ReadStream *stream,
184 : void *callback_private_data,
185 : void *per_buffer_data)
186 : {
187 416521 : BlockRangeReadStreamPrivate *p = callback_private_data;
188 :
189 416521 : if (p->current_blocknum < p->last_exclusive)
190 340466 : return p->current_blocknum++;
191 :
192 76055 : return InvalidBlockNumber;
193 : }
194 :
195 : /*
196 : * Update stream stats with current pinned buffer depth.
197 : *
198 : * Called once per buffer returned to the consumer in read_stream_next_buffer().
199 : * Records the number of pinned buffers at that moment, so we can compute the
200 : * average look-ahead depth.
201 : */
202 : static inline void
203 5063580 : read_stream_count_prefetch(ReadStream *stream)
204 : {
205 5063580 : IOStats *stats = stream->stats;
206 :
207 5063580 : if (stats == NULL)
208 5063572 : return;
209 :
210 8 : stats->prefetch_count++;
211 8 : stats->distance_sum += stream->pinned_buffers;
212 8 : if (stream->pinned_buffers > stats->distance_max)
213 8 : stats->distance_max = stream->pinned_buffers;
214 : }
215 :
216 : /*
217 : * Update stream stats about size of I/O requests.
218 : *
219 : * We count the number of I/O requests, size of requests (counted in blocks)
220 : * and number of in-progress I/Os.
221 : */
222 : static inline void
223 704135 : read_stream_count_io(ReadStream *stream, int nblocks, int in_progress)
224 : {
225 704135 : IOStats *stats = stream->stats;
226 :
227 704135 : if (stats == NULL)
228 704134 : return;
229 :
230 1 : stats->io_count++;
231 1 : stats->io_nblocks += nblocks;
232 1 : stats->io_in_progress += in_progress;
233 : }
234 :
235 : /*
236 : * Update stream stats about waits for I/O when consuming buffers.
237 : *
238 : * We count the number of I/O waits while pulling buffers out of a stream.
239 : */
240 : static inline void
241 348358 : read_stream_count_wait(ReadStream *stream)
242 : {
243 348358 : IOStats *stats = stream->stats;
244 :
245 348358 : if (stats == NULL)
246 348358 : return;
247 :
248 0 : stats->wait_count++;
249 : }
250 :
251 : /*
252 : * Enable collection of stats into the provided IOStats.
253 : */
254 : void
255 8 : read_stream_enable_stats(ReadStream *stream, IOStats *stats)
256 : {
257 8 : stream->stats = stats;
258 8 : if (stream->stats)
259 8 : stream->stats->distance_capacity = stream->max_pinned_buffers;
260 8 : }
261 :
262 : /*
263 : * Ask the callback which block it would like us to read next, with a one block
264 : * buffer in front to allow read_stream_unget_block() to work.
265 : */
266 : static inline BlockNumber
267 6607498 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
268 : {
269 : BlockNumber blocknum;
270 :
271 6607498 : blocknum = stream->buffered_blocknum;
272 6607498 : if (blocknum != InvalidBlockNumber)
273 0 : stream->buffered_blocknum = InvalidBlockNumber;
274 : else
275 : {
276 : /*
277 : * Tell Valgrind that the per-buffer data is undefined. That replaces
278 : * the "noaccess" state that was set when the consumer moved past this
279 : * entry last time around the queue, and should also catch callbacks
280 : * that fail to initialize data that the buffer consumer later
281 : * accesses. On the first go around, it is undefined already.
282 : */
283 : VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
284 : stream->per_buffer_data_size);
285 6607498 : blocknum = stream->callback(stream,
286 : stream->callback_private_data,
287 : per_buffer_data);
288 : }
289 :
290 6607498 : return blocknum;
291 : }
292 :
293 : /*
294 : * In order to deal with buffer shortages and I/O limits after short reads, we
295 : * sometimes need to defer handling of a block we've already consumed from the
296 : * registered callback until later.
297 : */
298 : static inline void
299 0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
300 : {
301 : /* We shouldn't ever unget more than one block. */
302 : Assert(stream->buffered_blocknum == InvalidBlockNumber);
303 : Assert(blocknum != InvalidBlockNumber);
304 0 : stream->buffered_blocknum = blocknum;
305 0 : }
306 :
307 : /*
308 : * Start as much of the current pending read as we can. If we have to split it
309 : * because of the per-backend buffer limit, or the buffer manager decides to
310 : * split it, then the pending read is adjusted to hold the remaining portion.
311 : *
312 : * We can always start a read of at least size one if we have no progress yet.
313 : * Otherwise it's possible that we can't start a read at all because of a lack
314 : * of buffers, and then false is returned. Buffer shortages also reduce the
315 : * distance to a level that prevents look-ahead until buffers are released.
316 : */
317 : static bool
318 2164867 : read_stream_start_pending_read(ReadStream *stream)
319 : {
320 : bool need_wait;
321 : int requested_nblocks;
322 : int nblocks;
323 : int flags;
324 : int forwarded;
325 : int16 io_index;
326 : int16 overflow;
327 : int16 buffer_index;
328 : int buffer_limit;
329 :
330 : /* This should only be called with a pending read. */
331 : Assert(stream->pending_read_nblocks > 0);
332 : Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
333 :
334 : /* We had better not exceed the per-stream buffer limit with this read. */
335 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
336 : stream->max_pinned_buffers);
337 :
338 : #ifdef USE_ASSERT_CHECKING
339 : /* We had better not be overwriting an existing pinned buffer. */
340 : if (stream->pinned_buffers > 0)
341 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
342 : else
343 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
344 :
345 : /*
346 : * Pinned buffers forwarded by a preceding StartReadBuffers() call that
347 : * had to split the operation should match the leading blocks of this
348 : * following StartReadBuffers() call.
349 : */
350 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
351 : for (int i = 0; i < stream->forwarded_buffers; ++i)
352 : Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
353 : stream->pending_read_blocknum + i);
354 :
355 : /*
356 : * Check that we've cleared the queue/overflow entries corresponding to
357 : * the rest of the blocks covered by this read, unless it's the first go
358 : * around and we haven't even initialized them yet.
359 : */
360 : for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
361 : Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
362 : stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
363 : #endif
364 :
365 : /* Do we need to issue read-ahead advice? */
366 2164867 : flags = stream->read_buffers_flags;
367 2164867 : if (stream->advice_enabled)
368 : {
369 1708 : if (stream->pending_read_blocknum == stream->seq_blocknum)
370 : {
371 : /*
372 : * Sequential: Issue advice until the preadv() calls have caught
373 : * up with the first advice issued for this sequential region, and
374 : * then stay out of the way of the kernel's own read-ahead.
375 : */
376 29 : if (stream->seq_until_processed != InvalidBlockNumber)
377 1 : flags |= READ_BUFFERS_ISSUE_ADVICE;
378 : }
379 : else
380 : {
381 : /*
382 : * Random jump: Note the starting location of a new potential
383 : * sequential region and start issuing advice. Skip it this time
384 : * if the preadv() follows immediately, eg first block in stream.
385 : */
386 1679 : stream->seq_until_processed = stream->pending_read_blocknum;
387 1679 : if (stream->pinned_buffers > 0)
388 44 : flags |= READ_BUFFERS_ISSUE_ADVICE;
389 : }
390 : }
391 :
392 : /*
393 : * How many more buffers is this backend allowed?
394 : *
395 : * Forwarded buffers are already pinned and map to the leading blocks of
396 : * the pending read (the remaining portion of an earlier short read that
397 : * we're about to continue). They are not counted in pinned_buffers, but
398 : * they are counted as pins already held by this backend according to the
399 : * buffer manager, so they must be added to the limit it grants us.
400 : */
401 2164867 : if (stream->temporary)
402 16396 : buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
403 : else
404 2148471 : buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
405 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
406 :
407 2164867 : buffer_limit += stream->forwarded_buffers;
408 2164867 : buffer_limit = Min(buffer_limit, PG_INT16_MAX);
409 :
410 2164867 : if (buffer_limit == 0 && stream->pinned_buffers == 0)
411 744636 : buffer_limit = 1; /* guarantee progress */
412 :
413 : /* Does the per-backend limit affect this read? */
414 2164867 : nblocks = stream->pending_read_nblocks;
415 2164867 : if (buffer_limit < nblocks)
416 : {
417 : int16 new_distance;
418 :
419 : /* Shrink distance: no more look-ahead until buffers are released. */
420 1980 : new_distance = stream->pinned_buffers + buffer_limit;
421 1980 : if (stream->readahead_distance > new_distance)
422 536 : stream->readahead_distance = new_distance;
423 :
424 : /* Unless we have nothing to give the consumer, stop here. */
425 1980 : if (stream->pinned_buffers > 0)
426 114 : return false;
427 :
428 : /* A short read is required to make progress. */
429 1866 : nblocks = buffer_limit;
430 : }
431 :
432 : /*
433 : * We say how many blocks we want to read, but it may be smaller on return
434 : * if the buffer manager decides to shorten the read. Initialize buffers
435 : * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
436 : * and keep the original nblocks number so we can check for forwarded
437 : * buffers as output, below.
438 : */
439 2164753 : buffer_index = stream->next_buffer_index;
440 2164753 : io_index = stream->next_io_index;
441 3518569 : while (stream->initialized_buffers < buffer_index + nblocks)
442 1353816 : stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
443 2164753 : requested_nblocks = nblocks;
444 2164753 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
445 2164753 : &stream->buffers[buffer_index],
446 : stream->pending_read_blocknum,
447 : &nblocks,
448 : flags);
449 2164745 : stream->pinned_buffers += nblocks;
450 :
451 : /* Remember whether we need to wait before returning this buffer. */
452 2164745 : if (!need_wait)
453 : {
454 : /*
455 : * If there currently is no IO in progress, and we have not needed to
456 : * issue IO recently, decay the look-ahead distance. We detect if we
457 : * had to issue IO recently by having a decay holdoff that's set to
458 : * the max look-ahead distance whenever we need to do IO. This is
459 : * important to ensure we eventually reach a high enough distance to
460 : * perform IO asynchronously when starting out with a small look-ahead
461 : * distance.
462 : */
463 1476391 : if (stream->ios_in_progress == 0)
464 : {
465 1475827 : if (stream->distance_decay_holdoff > 0)
466 26218 : stream->distance_decay_holdoff--;
467 : else
468 : {
469 1449609 : if (stream->readahead_distance > 1)
470 17300 : stream->readahead_distance--;
471 :
472 : /*
473 : * For now we reduce the IO combine distance after
474 : * sufficiently many buffer hits. There is no clear
475 : * performance argument for doing so, but at the moment we
476 : * need to do so to make the entrance into fast_path work
477 : * correctly: We require combine_distance == 1 to enter
478 : * fast-path, as without that condition we would wrongly
479 : * re-enter fast-path when readahead_distance == 1 and
480 : * pinned_buffers == 1, as we would not yet have prepared
481 : * another IO in that situation.
482 : */
483 1449609 : if (stream->combine_distance > 1)
484 17417 : stream->combine_distance--;
485 : }
486 : }
487 : }
488 : else
489 : {
490 : /*
491 : * Remember to call WaitReadBuffers() before returning head buffer.
492 : * Look-ahead distance will be adjusted after waiting.
493 : */
494 688354 : stream->ios[io_index].buffer_index = buffer_index;
495 688354 : if (++stream->next_io_index == stream->max_ios)
496 29373 : stream->next_io_index = 0;
497 : Assert(stream->ios_in_progress < stream->max_ios);
498 688354 : stream->ios_in_progress++;
499 688354 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
500 :
501 : /* update I/O stats */
502 688354 : read_stream_count_io(stream, nblocks, stream->ios_in_progress);
503 : }
504 :
505 : /*
506 : * How many pins were acquired but forwarded to the next call? These need
507 : * to be passed to the next StartReadBuffers() call by leaving them
508 : * exactly where they are in the queue, or released if the stream ends
509 : * early. We need the number for accounting purposes, since they are not
510 : * counted in stream->pinned_buffers but we already hold them.
511 : */
512 2164745 : forwarded = 0;
513 2166966 : while (nblocks + forwarded < requested_nblocks &&
514 66488 : stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
515 2221 : forwarded++;
516 2164745 : stream->forwarded_buffers = forwarded;
517 :
518 : /*
519 : * We gave a contiguous range of buffer space to StartReadBuffers(), but
520 : * we want it to wrap around at queue_size. Copy overflowing buffers to
521 : * the front of the array where they'll be consumed, but also leave a copy
522 : * in the overflow zone which the I/O operation has a pointer to (it needs
523 : * a contiguous array). Both copies will be cleared when the buffers are
524 : * handed to the consumer.
525 : */
526 2164745 : overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
527 2164745 : if (overflow > 0)
528 : {
529 : Assert(overflow < stream->queue_size); /* can't overlap */
530 454 : memcpy(&stream->buffers[0],
531 454 : &stream->buffers[stream->queue_size],
532 : sizeof(stream->buffers[0]) * overflow);
533 : }
534 :
535 : /* Compute location of start of next read, without using % operator. */
536 2164745 : buffer_index += nblocks;
537 2164745 : if (buffer_index >= stream->queue_size)
538 348821 : buffer_index -= stream->queue_size;
539 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
540 2164745 : stream->next_buffer_index = buffer_index;
541 :
542 : /* Adjust the pending read to cover the remaining portion, if any. */
543 2164745 : stream->pending_read_blocknum += nblocks;
544 2164745 : stream->pending_read_nblocks -= nblocks;
545 :
546 2164745 : return true;
547 : }
548 :
549 : /*
550 : * Should we continue to perform look ahead? Looking ahead may allow us to
551 : * make the pending IO larger via IO combining or to issue more read-ahead.
552 : */
553 : static inline bool
554 6381562 : read_stream_should_look_ahead(ReadStream *stream)
555 : {
556 : /* If the callback has signaled end-of-stream, we're done */
557 6381562 : if (stream->readahead_distance == 0)
558 336145 : return false;
559 :
560 : /* never start more IOs than our cap */
561 6045417 : if (stream->ios_in_progress >= stream->max_ios)
562 0 : return false;
563 :
564 : /*
565 : * Allow looking further ahead if we are in the process of building a
566 : * larger IO, the IO is not yet big enough, and we don't yet have IO in
567 : * flight.
568 : *
569 : * We do so to allow building larger reads when readahead_distance is
570 : * small (e.g. because the I/O subsystem is keeping up or
571 : * effective_io_concurrency is small). That's a useful goal because larger
572 : * reads are more CPU efficient than smaller reads, even if the system is
573 : * not IO bound.
574 : *
575 : * The reason we do *not* do so when we already have a read prepared (i.e.
576 : * why we check for pinned_buffers == 0) is once we are actually reading
577 : * ahead, we don't need it:
578 : *
579 : * - We won't issue unnecessarily small reads as
580 : * read_stream_should_issue_now() will return false until the IO is
581 : * suitably sized. The issuance of the pending read will be delayed until
582 : * enough buffers have been consumed.
583 : *
584 : * - If we are not reading ahead aggressively enough, future
585 : * WaitReadBuffers() calls will return true, leading to readahead_distance
586 : * being increased. After that more full-sized IOs can be issued.
587 : *
588 : * Furthermore, if we did not have the pinned_buffers == 0 condition, we
589 : * might end up issuing I/O more aggressively than we need.
590 : *
591 : * Note that a return of true here can lead to exceeding the read-ahead
592 : * limit, but we won't exceed the buffer pin limit (because pinned_buffers
593 : * == 0 and combine_distance is capped by max_pinned_buffers).
594 : */
595 6045417 : if (stream->pending_read_nblocks > 0 &&
596 2662916 : stream->pinned_buffers == 0 &&
597 2526416 : stream->pending_read_nblocks < stream->combine_distance)
598 470094 : return true;
599 :
600 : /*
601 : * Don't start more read-ahead if that'd put us over the distance limit
602 : * for doing read-ahead. As stream->readahead_distance is capped by
603 : * max_pinned_buffers, this prevents us from looking ahead so far that it
604 : * would put us over the pin limit.
605 : */
606 5575323 : if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
607 2132260 : return false;
608 :
609 3443063 : return true;
610 : }
611 :
612 : /*
613 : * We don't start the pending read just because we've hit the distance limit,
614 : * preferring to give it another chance to grow to full io_combine_limit size
615 : * once more buffers have been consumed. But this is not desirable in all
616 : * situations - see below.
617 : */
618 : static inline bool
619 7473172 : read_stream_should_issue_now(ReadStream *stream)
620 : {
621 7473172 : int16 pending_read_nblocks = stream->pending_read_nblocks;
622 :
623 : /* there is no pending IO that could be issued */
624 7473172 : if (pending_read_nblocks == 0)
625 4941883 : return false;
626 :
627 : /* never start more IOs than our cap */
628 2531289 : if (stream->ios_in_progress >= stream->max_ios)
629 0 : return false;
630 :
631 : /*
632 : * If the callback has signaled end-of-stream, start the pending read
633 : * immediately. There is no further potential for IO combining.
634 : */
635 2531289 : if (stream->readahead_distance == 0)
636 103420 : return true;
637 :
638 : /*
639 : * If we've already reached combine_distance, there's no chance of growing
640 : * the read further.
641 : */
642 2427869 : if (pending_read_nblocks >= stream->combine_distance)
643 2061412 : return true;
644 :
645 : /*
646 : * If we currently have no reads in flight or prepared, issue the IO once
647 : * we are not looking ahead further. This ensures there's always at least
648 : * one IO prepared.
649 : */
650 366457 : if (stream->pinned_buffers == 0 &&
651 235047 : !read_stream_should_look_ahead(stream))
652 0 : return true;
653 :
654 366457 : return false;
655 : }
656 :
657 : static void
658 3795062 : read_stream_look_ahead(ReadStream *stream)
659 : {
660 : /*
661 : * Allow amortizing the cost of submitting IO over multiple IOs. This
662 : * requires that we don't do any operations that could lead to a deadlock
663 : * with staged-but-unsubmitted IO. The callback needs to opt-in to being
664 : * careful.
665 : */
666 3795062 : if (stream->batch_mode)
667 3210888 : pgaio_enter_batchmode();
668 :
669 6146515 : while (read_stream_should_look_ahead(stream))
670 : {
671 : BlockNumber blocknum;
672 : int16 buffer_index;
673 : void *per_buffer_data;
674 :
675 3678110 : if (read_stream_should_issue_now(stream))
676 : {
677 2077 : read_stream_start_pending_read(stream);
678 2077 : continue;
679 : }
680 :
681 : /*
682 : * See which block the callback wants next in the stream. We need to
683 : * compute the index of the Nth block of the pending read including
684 : * wrap-around, but we don't want to use the expensive % operator.
685 : */
686 3676033 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
687 3676033 : if (buffer_index >= stream->queue_size)
688 3648 : buffer_index -= stream->queue_size;
689 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
690 3676033 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
691 3676033 : blocknum = read_stream_get_block(stream, per_buffer_data);
692 3676033 : if (blocknum == InvalidBlockNumber)
693 : {
694 : /* End of stream. */
695 1326657 : stream->readahead_distance = 0;
696 1326657 : stream->combine_distance = 0;
697 1326657 : break;
698 : }
699 :
700 : /* Can we merge it with the pending read? */
701 2349376 : if (stream->pending_read_nblocks > 0 &&
702 251207 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
703 : {
704 251178 : stream->pending_read_nblocks++;
705 251178 : continue;
706 : }
707 :
708 : /* We have to start the pending read before we can build another. */
709 2098233 : while (stream->pending_read_nblocks > 0)
710 : {
711 35 : if (!read_stream_start_pending_read(stream) ||
712 35 : stream->ios_in_progress == stream->max_ios)
713 : {
714 : /* We've hit the buffer or I/O limit. Rewind and stop here. */
715 0 : read_stream_unget_block(stream, blocknum);
716 0 : if (stream->batch_mode)
717 0 : pgaio_exit_batchmode();
718 0 : return;
719 : }
720 : }
721 :
722 : /* This is the start of a new pending read. */
723 2098198 : stream->pending_read_blocknum = blocknum;
724 2098198 : stream->pending_read_nblocks = 1;
725 : }
726 :
727 : /*
728 : * Check if the pending read should be issued now, or if we should give it
729 : * another chance to grow to the full size.
730 : *
731 : * Note that the pending read can exceed the distance goal, if the latter
732 : * was reduced after hitting the per-backend buffer limit.
733 : */
734 3795062 : if (read_stream_should_issue_now(stream))
735 2162755 : read_stream_start_pending_read(stream);
736 :
737 : /*
738 : * There should always be something pinned when we leave this function,
739 : * whether started by this call or not, unless we've hit the end of the
740 : * stream. In the worst case we can always make progress one buffer at a
741 : * time.
742 : */
743 : Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
744 :
745 3795054 : if (stream->batch_mode)
746 3210880 : pgaio_exit_batchmode();
747 : }
748 :
749 : /*
750 : * Create a new read stream object that can be used to perform the equivalent
751 : * of a series of ReadBuffer() calls for one fork of one relation.
752 : * Internally, it generates larger vectored reads where possible by looking
753 : * ahead. The callback should return block numbers or InvalidBlockNumber to
754 : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
755 : * write extra data for each block into the space provided to it. It will
756 : * also receive callback_private_data for its own purposes.
757 : */
758 : static ReadStream *
759 694869 : read_stream_begin_impl(int flags,
760 : BufferAccessStrategy strategy,
761 : Relation rel,
762 : SMgrRelation smgr,
763 : char persistence,
764 : ForkNumber forknum,
765 : ReadStreamBlockNumberCB callback,
766 : void *callback_private_data,
767 : size_t per_buffer_data_size)
768 : {
769 : ReadStream *stream;
770 : size_t size;
771 : int16 queue_size;
772 : int16 queue_overflow;
773 : int max_ios;
774 : int strategy_pin_limit;
775 : uint32 max_pinned_buffers;
776 : uint32 max_possible_buffer_limit;
777 : Oid tablespace_id;
778 :
779 : /*
780 : * Reject attempts to read non-local temporary relations; we would be
781 : * likely to get wrong data since we have no visibility into the owning
782 : * session's local buffers.
783 : */
784 694869 : if (rel && RELATION_IS_OTHER_TEMP(rel))
785 5 : ereport(ERROR,
786 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
787 : errmsg("cannot access temporary tables of other sessions")));
788 :
789 : /*
790 : * Decide how many I/Os we will allow to run at the same time. This
791 : * number also affects how far we look ahead for opportunities to start
792 : * more I/Os.
793 : */
794 694864 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
795 694864 : if (!OidIsValid(MyDatabaseId) ||
796 811014 : (rel && IsCatalogRelation(rel)) ||
797 197294 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
798 : {
799 : /*
800 : * Avoid circularity while trying to look up tablespace settings or
801 : * before spccache.c is ready.
802 : */
803 563287 : max_ios = effective_io_concurrency;
804 : }
805 131577 : else if (flags & READ_STREAM_MAINTENANCE)
806 18184 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
807 : else
808 113393 : max_ios = get_tablespace_io_concurrency(tablespace_id);
809 :
810 : /* Cap to INT16_MAX to avoid overflowing below */
811 694864 : max_ios = Min(max_ios, PG_INT16_MAX);
812 :
813 : /*
814 : * If starting a multi-block I/O near the end of the queue, we might
815 : * temporarily need extra space for overflowing buffers before they are
816 : * moved to regular circular position. This is the maximum extra space we
817 : * could need.
818 : */
819 694864 : queue_overflow = io_combine_limit - 1;
820 :
821 : /*
822 : * Choose the maximum number of buffers we're prepared to pin. We try to
823 : * pin fewer if we can, though. We add one so that we can make progress
824 : * even if max_ios is set to 0 (see also further down). For max_ios > 0,
825 : * this also allows an extra full I/O's worth of buffers: after an I/O
826 : * finishes we don't want to have to wait for its buffers to be consumed
827 : * before starting a new one.
828 : *
829 : * Be careful not to allow int16 to overflow. That is possible with the
830 : * current GUC range limits, so this is an artificial limit of ~32k
831 : * buffers and we'd need to adjust the types to exceed that. We also have
832 : * to allow for the spare entry and the overflow space.
833 : */
834 694864 : max_pinned_buffers = (max_ios + 1) * io_combine_limit;
835 694864 : max_pinned_buffers = Min(max_pinned_buffers,
836 : PG_INT16_MAX - queue_overflow - 1);
837 :
838 : /* Give the strategy a chance to limit the number of buffers we pin. */
839 694864 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
840 694864 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
841 :
842 : /*
843 : * Also limit our queue to the maximum number of pins we could ever be
844 : * allowed to acquire according to the buffer manager. We may not really
845 : * be able to use them all due to other pins held by this backend, but
846 : * we'll check that later in read_stream_start_pending_read().
847 : */
848 694864 : if (SmgrIsTemp(smgr))
849 9304 : max_possible_buffer_limit = GetLocalPinLimit();
850 : else
851 685560 : max_possible_buffer_limit = GetPinLimit();
852 694864 : max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
853 :
854 : /*
855 : * The limit might be zero on a system configured with too few buffers for
856 : * the number of connections. We need at least one to make progress.
857 : */
858 694864 : max_pinned_buffers = Max(1, max_pinned_buffers);
859 :
860 : /*
861 : * We need one extra entry for buffers and per-buffer data, because users
862 : * of per-buffer data have access to the object until the next call to
863 : * read_stream_next_buffer(), so we need a gap between the head and tail
864 : * of the queue so that we don't clobber it.
865 : */
866 694864 : queue_size = max_pinned_buffers + 1;
867 :
868 : /*
869 : * Allocate the object, the buffers, the ios and per_buffer_data space in
870 : * one big chunk. Though we have queue_size buffers, we want to be able
871 : * to assume that all the buffers for a single read are contiguous (i.e.
872 : * don't wrap around halfway through), so we allow temporary overflows of
873 : * up to the maximum possible overflow size.
874 : */
875 694864 : size = offsetof(ReadStream, buffers);
876 694864 : size += sizeof(Buffer) * (queue_size + queue_overflow);
877 694864 : size += sizeof(InProgressIO) * Max(1, max_ios);
878 694864 : size += per_buffer_data_size * queue_size;
879 694864 : size += MAXIMUM_ALIGNOF * 2;
880 694864 : stream = (ReadStream *) palloc(size);
881 694864 : memset(stream, 0, offsetof(ReadStream, buffers));
882 694864 : stream->ios = (InProgressIO *)
883 694864 : MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
884 694864 : if (per_buffer_data_size > 0)
885 140675 : stream->per_buffer_data = (void *)
886 140675 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
887 :
888 694864 : stream->sync_mode = io_method == IOMETHOD_SYNC;
889 694864 : stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
890 :
891 : #ifdef USE_PREFETCH
892 :
893 : /*
894 : * Read-ahead advice simulating asynchronous I/O with synchronous calls.
895 : * Issue advice only if AIO is not used, direct I/O isn't enabled, the
896 : * caller hasn't promised sequential access (overriding our detection
897 : * heuristics), and max_ios hasn't been set to zero.
898 : */
899 694864 : if (stream->sync_mode &&
900 3161 : (io_direct_flags & IO_DIRECT_DATA) == 0 &&
901 3161 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
902 : max_ios > 0)
903 738 : stream->advice_enabled = true;
904 : #endif
905 :
906 : /*
907 : * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
908 : * we still need to allocate space to combine and run one I/O. Bump it up
909 : * to one, and remember to ask for synchronous I/O only.
910 : */
911 694864 : if (max_ios == 0)
912 : {
913 7 : max_ios = 1;
914 7 : stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
915 : }
916 :
917 : /*
918 : * Capture stable values for these two GUC-derived numbers for the
919 : * lifetime of this stream, so we don't have to worry about the GUCs
920 : * changing underneath us beyond this point.
921 : */
922 694864 : stream->max_ios = max_ios;
923 694864 : stream->io_combine_limit = io_combine_limit;
924 :
925 694864 : stream->per_buffer_data_size = per_buffer_data_size;
926 694864 : stream->max_pinned_buffers = max_pinned_buffers;
927 694864 : stream->queue_size = queue_size;
928 694864 : stream->callback = callback;
929 694864 : stream->callback_private_data = callback_private_data;
930 694864 : stream->buffered_blocknum = InvalidBlockNumber;
931 694864 : stream->seq_blocknum = InvalidBlockNumber;
932 694864 : stream->seq_until_processed = InvalidBlockNumber;
933 694864 : stream->temporary = SmgrIsTemp(smgr);
934 694864 : stream->distance_decay_holdoff = 0;
935 :
936 : /*
937 : * Skip the initial ramp-up phase if the caller says we're going to be
938 : * reading the whole relation. This way we start out assuming we'll be
939 : * doing full io_combine_limit sized reads.
940 : */
941 694864 : if (flags & READ_STREAM_FULL)
942 : {
943 77007 : stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
944 77007 : stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
945 : }
946 : else
947 : {
948 617857 : stream->readahead_distance = 1;
949 617857 : stream->combine_distance = 1;
950 : }
951 694864 : stream->resume_readahead_distance = stream->readahead_distance;
952 694864 : stream->resume_combine_distance = stream->combine_distance;
953 :
954 : /*
955 : * Since we always access the same relation, we can initialize parts of
956 : * the ReadBuffersOperation objects and leave them that way, to avoid
957 : * wasting CPU cycles writing to them for each read.
958 : */
959 11841313 : for (int i = 0; i < max_ios; ++i)
960 : {
961 11146449 : stream->ios[i].op.rel = rel;
962 11146449 : stream->ios[i].op.smgr = smgr;
963 11146449 : stream->ios[i].op.persistence = persistence;
964 11146449 : stream->ios[i].op.forknum = forknum;
965 11146449 : stream->ios[i].op.strategy = strategy;
966 : }
967 :
968 694864 : return stream;
969 : }
970 :
971 : /*
972 : * Create a new read stream for reading a relation.
973 : * See read_stream_begin_impl() for the detailed explanation.
974 : */
975 : ReadStream *
976 624651 : read_stream_begin_relation(int flags,
977 : BufferAccessStrategy strategy,
978 : Relation rel,
979 : ForkNumber forknum,
980 : ReadStreamBlockNumberCB callback,
981 : void *callback_private_data,
982 : size_t per_buffer_data_size)
983 : {
984 624651 : return read_stream_begin_impl(flags,
985 : strategy,
986 : rel,
987 : RelationGetSmgr(rel),
988 624651 : rel->rd_rel->relpersistence,
989 : forknum,
990 : callback,
991 : callback_private_data,
992 : per_buffer_data_size);
993 : }
994 :
995 : /*
996 : * Create a new read stream for reading a SMgr relation.
997 : * See read_stream_begin_impl() for the detailed explanation.
998 : */
999 : ReadStream *
1000 70218 : read_stream_begin_smgr_relation(int flags,
1001 : BufferAccessStrategy strategy,
1002 : SMgrRelation smgr,
1003 : char smgr_persistence,
1004 : ForkNumber forknum,
1005 : ReadStreamBlockNumberCB callback,
1006 : void *callback_private_data,
1007 : size_t per_buffer_data_size)
1008 : {
1009 70218 : return read_stream_begin_impl(flags,
1010 : strategy,
1011 : NULL,
1012 : smgr,
1013 : smgr_persistence,
1014 : forknum,
1015 : callback,
1016 : callback_private_data,
1017 : per_buffer_data_size);
1018 : }
1019 :
1020 : /*
1021 : * Pull one pinned buffer out of a stream. Each call returns successive
1022 : * blocks in the order specified by the callback. If per_buffer_data_size was
1023 : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
1024 : * per-buffer data that the callback had a chance to populate, which remains
1025 : * valid until the next call to read_stream_next_buffer(). When the stream
1026 : * runs out of data, InvalidBuffer is returned. The caller may decide to end
1027 : * the stream early at any time by calling read_stream_end().
1028 : */
1029 : Buffer
1030 8011835 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
1031 : {
1032 : Buffer buffer;
1033 : int16 oldest_buffer_index;
1034 :
1035 : #ifndef READ_STREAM_DISABLE_FAST_PATH
1036 :
1037 : /*
1038 : * A fast path for all-cached scans. This is the same as the usual
1039 : * algorithm, but it is specialized for no I/O and no per-buffer data, so
1040 : * we can skip the queue management code, stay in the same buffer slot and
1041 : * use singular StartReadBuffer().
1042 : */
1043 8011835 : if (likely(stream->fast_path))
1044 : {
1045 : BlockNumber next_blocknum;
1046 :
1047 : /* Fast path assumptions. */
1048 : Assert(stream->ios_in_progress == 0);
1049 : Assert(stream->forwarded_buffers == 0);
1050 : Assert(stream->pinned_buffers == 1);
1051 : Assert(stream->readahead_distance == 1);
1052 : Assert(stream->combine_distance == 1);
1053 : Assert(stream->pending_read_nblocks == 0);
1054 : Assert(stream->per_buffer_data_size == 0);
1055 : Assert(stream->initialized_buffers > stream->oldest_buffer_index);
1056 :
1057 : /* We're going to return the buffer we pinned last time. */
1058 2931465 : oldest_buffer_index = stream->oldest_buffer_index;
1059 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
1060 : stream->next_buffer_index);
1061 2931465 : buffer = stream->buffers[oldest_buffer_index];
1062 : Assert(buffer != InvalidBuffer);
1063 :
1064 : /* Choose the next block to pin. */
1065 2931465 : next_blocknum = read_stream_get_block(stream, NULL);
1066 :
1067 2931465 : if (likely(next_blocknum != InvalidBlockNumber))
1068 : {
1069 2823439 : int flags = stream->read_buffers_flags;
1070 :
1071 2823439 : if (stream->advice_enabled)
1072 548 : flags |= READ_BUFFERS_ISSUE_ADVICE;
1073 :
1074 : /*
1075 : * While in fast-path, execute any IO that we might encounter
1076 : * synchronously. Because we are, right now, only looking one
1077 : * block ahead, dispatching any occasional IO to workers would
1078 : * have the overhead of dispatching to workers, without any
1079 : * realistic chance of the IO completing before we need it. We
1080 : * will switch to non-synchronous IO after this.
1081 : *
1082 : * Arguably we should do so only for worker, as there's far less
1083 : * dispatch overhead with io_uring. However, tests so far have not
1084 : * shown a clear downside and additional io_method awareness here
1085 : * seems not great from an abstraction POV.
1086 : */
1087 2823439 : flags |= READ_BUFFERS_SYNCHRONOUSLY;
1088 :
1089 : /*
1090 : * Pin a buffer for the next call. Same buffer entry, and
1091 : * arbitrary I/O entry (they're all free). We don't have to
1092 : * adjust pinned_buffers because we're transferring one to caller
1093 : * but pinning one more.
1094 : *
1095 : * In the fast path we don't need to check the pin limit. We're
1096 : * always allowed at least one pin so that progress can be made,
1097 : * and that's all we need here. Although two pins are momentarily
1098 : * held at the same time, the model used here is that the stream
1099 : * holds only one, and the other now belongs to the caller.
1100 : */
1101 2823439 : if (likely(!StartReadBuffer(&stream->ios[0].op,
1102 : &stream->buffers[oldest_buffer_index],
1103 : next_blocknum,
1104 : flags)))
1105 : {
1106 : /* Fast return. */
1107 2807658 : read_stream_count_prefetch(stream);
1108 2807658 : return buffer;
1109 : }
1110 :
1111 : /* Next call must wait for I/O for the newly pinned buffer. */
1112 15781 : stream->oldest_io_index = 0;
1113 15781 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
1114 15781 : stream->ios_in_progress = 1;
1115 15781 : stream->ios[0].buffer_index = oldest_buffer_index;
1116 15781 : stream->seq_blocknum = next_blocknum + 1;
1117 :
1118 : /*
1119 : * XXX: It might be worth triggering additional read-ahead here,
1120 : * to avoid having to effectively do another synchronous IO for
1121 : * the next block (if it were also a miss).
1122 : */
1123 :
1124 : /* update I/O stats */
1125 15781 : read_stream_count_io(stream, 1, stream->ios_in_progress);
1126 :
1127 : /* update prefetch distance */
1128 15781 : read_stream_count_prefetch(stream);
1129 : }
1130 : else
1131 : {
1132 : /* No more blocks, end of stream. */
1133 108026 : stream->readahead_distance = 0;
1134 108026 : stream->combine_distance = 0;
1135 108026 : stream->oldest_buffer_index = stream->next_buffer_index;
1136 108026 : stream->pinned_buffers = 0;
1137 108026 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
1138 : }
1139 :
1140 123807 : stream->fast_path = false;
1141 123807 : return buffer;
1142 : }
1143 : #endif
1144 :
1145 5080370 : if (unlikely(stream->pinned_buffers == 0))
1146 : {
1147 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
1148 :
1149 : /* End of stream reached? */
1150 3547693 : if (stream->readahead_distance == 0)
1151 1992772 : return InvalidBuffer;
1152 :
1153 : /*
1154 : * The usual order of operations is that we look ahead at the bottom
1155 : * of this function after potentially finishing an I/O and making
1156 : * space for more, but if we're just starting up we'll need to crank
1157 : * the handle to get started.
1158 : */
1159 1554921 : read_stream_look_ahead(stream);
1160 :
1161 : /* End of stream reached? */
1162 1554921 : if (stream->pinned_buffers == 0)
1163 : {
1164 : Assert(stream->readahead_distance == 0);
1165 847436 : return InvalidBuffer;
1166 : }
1167 : }
1168 :
1169 : /* Grab the oldest pinned buffer and associated per-buffer data. */
1170 : Assert(stream->pinned_buffers > 0);
1171 2240162 : oldest_buffer_index = stream->oldest_buffer_index;
1172 : Assert(oldest_buffer_index >= 0 &&
1173 : oldest_buffer_index < stream->queue_size);
1174 2240162 : buffer = stream->buffers[oldest_buffer_index];
1175 2240162 : if (per_buffer_data)
1176 720173 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
1177 :
1178 : Assert(BufferIsValid(buffer));
1179 :
1180 : /* Do we have to wait for an associated I/O first? */
1181 2240162 : if (stream->ios_in_progress > 0 &&
1182 764388 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
1183 : {
1184 703738 : int16 io_index = stream->oldest_io_index;
1185 : bool needed_wait;
1186 :
1187 : /* Sanity check that we still agree on the buffers. */
1188 : Assert(stream->ios[io_index].op.buffers ==
1189 : &stream->buffers[oldest_buffer_index]);
1190 :
1191 703738 : needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
1192 :
1193 : Assert(stream->ios_in_progress > 0);
1194 703717 : stream->ios_in_progress--;
1195 703717 : if (++stream->oldest_io_index == stream->max_ios)
1196 29373 : stream->oldest_io_index = 0;
1197 :
1198 : /*
1199 : * If the IO was executed synchronously, we will never see
1200 : * WaitReadBuffers() block. Treat it as if it did block. This is
1201 : * particularly crucial when effective_io_concurrency=0 is used, as
1202 : * all IO will be synchronous. Without treating synchronous IO as
1203 : * having waited, we'd never allow the distance to get large enough to
1204 : * allow for IO combining, resulting in bad performance.
1205 : */
1206 703717 : if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY)
1207 16259 : needed_wait = true;
1208 :
1209 : /* Count it as a wait if we need to wait for IO */
1210 703717 : if (needed_wait)
1211 348358 : read_stream_count_wait(stream);
1212 :
1213 : /*
1214 : * Have the read-ahead distance ramp up rapidly after we needed to
1215 : * wait for IO. We only increase the read-ahead-distance when we
1216 : * needed to wait, to avoid increasing the distance further than
1217 : * necessary, as looking ahead too far can be costly, both due to the
1218 : * cost of unnecessarily pinning many buffers and due to doing IOs
1219 : * that may never be consumed if the stream is ended/reset before
1220 : * completion.
1221 : *
1222 : * If we did not need to wait, the current distance was evidently
1223 : * sufficient.
1224 : *
1225 : * NB: Must not increase the distance if we already reached the end of
1226 : * the stream, as stream->readahead_distance == 0 is used to keep
1227 : * track of having reached the end.
1228 : */
1229 703717 : if (stream->readahead_distance > 0 && needed_wait)
1230 : {
1231 : /* wider temporary value, due to overflow risk */
1232 : int32 readahead_distance;
1233 :
1234 320350 : readahead_distance = stream->readahead_distance * 2;
1235 320350 : readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
1236 320350 : stream->readahead_distance = readahead_distance;
1237 : }
1238 :
1239 : /*
1240 : * As we needed IO, prevent distances from being reduced within our
1241 : * maximum look-ahead window. This avoids collapsing distances too
1242 : * quickly in workloads where most of the required blocks are cached,
1243 : * but where the remaining IOs are a sufficient enough factor to cause
1244 : * a substantial slowdown if executed synchronously.
1245 : *
1246 : * There are valid arguments for preventing decay for max_ios or for
1247 : * max_pinned_buffers. But the argument for max_pinned_buffers seems
1248 : * clearer - if we can't see any misses within the maximum look-ahead
1249 : * distance, we can't do any useful read-ahead.
1250 : */
1251 703717 : stream->distance_decay_holdoff = stream->max_pinned_buffers;
1252 :
1253 : /*
1254 : * Whether we needed to wait or not, allow for more IO combining if we
1255 : * needed to do IO. The reason to do so independent of needing to wait
1256 : * is that when the data is resident in the kernel page cache, IO
1257 : * combining reduces the syscall / dispatch overhead, making it
1258 : * worthwhile regardless of needing to wait.
1259 : *
1260 : * It is also important with io_uring as it will never signal the need
1261 : * to wait for reads if all the data is in the page cache. There are
1262 : * heuristics to deal with that in method_io_uring.c, but they only
1263 : * work when the IO gets large enough.
1264 : */
1265 703717 : if (stream->combine_distance > 0 &&
1266 653001 : stream->combine_distance < stream->io_combine_limit)
1267 : {
1268 : /* wider temporary value, due to overflow risk */
1269 : int32 combine_distance;
1270 :
1271 644632 : combine_distance = stream->combine_distance * 2;
1272 644632 : combine_distance = Min(combine_distance, stream->io_combine_limit);
1273 644632 : combine_distance = Min(combine_distance, stream->max_pinned_buffers);
1274 644632 : stream->combine_distance = combine_distance;
1275 : }
1276 :
1277 : /*
1278 : * If we've reached the first block of a sequential region we're
1279 : * issuing advice for, cancel that until the next jump. The kernel
1280 : * will see the sequential preadv() pattern starting here.
1281 : */
1282 703717 : if (stream->advice_enabled &&
1283 318 : stream->ios[io_index].op.blocknum == stream->seq_until_processed)
1284 275 : stream->seq_until_processed = InvalidBlockNumber;
1285 : }
1286 :
1287 : /*
1288 : * We must zap this queue entry, or else it would appear as a forwarded
1289 : * buffer. If it's potentially in the overflow zone (ie from a
1290 : * multi-block I/O that wrapped around the queue), also zap the copy.
1291 : */
1292 2240141 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
1293 2240141 : if (oldest_buffer_index < stream->io_combine_limit - 1)
1294 1683565 : stream->buffers[stream->queue_size + oldest_buffer_index] =
1295 : InvalidBuffer;
1296 :
1297 : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
1298 :
1299 : /*
1300 : * The caller will get access to the per-buffer data, until the next call.
1301 : * We wipe the one before, which is never occupied because queue_size
1302 : * allowed one extra element. This will hopefully trip up client code
1303 : * that is holding a dangling pointer to it.
1304 : */
1305 : if (stream->per_buffer_data)
1306 : {
1307 : void *per_buffer_data;
1308 :
1309 : per_buffer_data = get_per_buffer_data(stream,
1310 : oldest_buffer_index == 0 ?
1311 : stream->queue_size - 1 :
1312 : oldest_buffer_index - 1);
1313 :
1314 : #if defined(CLOBBER_FREED_MEMORY)
1315 : /* This also tells Valgrind the memory is "noaccess". */
1316 : wipe_mem(per_buffer_data, stream->per_buffer_data_size);
1317 : #elif defined(USE_VALGRIND)
1318 : /* Tell it ourselves. */
1319 : VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
1320 : stream->per_buffer_data_size);
1321 : #endif
1322 : }
1323 : #endif
1324 :
1325 2240141 : read_stream_count_prefetch(stream);
1326 :
1327 : /* Pin transferred to caller. */
1328 : Assert(stream->pinned_buffers > 0);
1329 2240141 : stream->pinned_buffers--;
1330 :
1331 : /* Advance oldest buffer, with wrap-around. */
1332 2240141 : stream->oldest_buffer_index++;
1333 2240141 : if (stream->oldest_buffer_index == stream->queue_size)
1334 341188 : stream->oldest_buffer_index = 0;
1335 :
1336 : /* Prepare for the next call. */
1337 2240141 : read_stream_look_ahead(stream);
1338 :
1339 : #ifndef READ_STREAM_DISABLE_FAST_PATH
1340 : /* See if we can take the fast path for all-cached scans next time. */
1341 2240133 : if (stream->ios_in_progress == 0 &&
1342 1573273 : stream->forwarded_buffers == 0 &&
1343 1570318 : stream->pinned_buffers == 1 &&
1344 877406 : stream->readahead_distance == 1 &&
1345 793761 : stream->combine_distance == 1 &&
1346 790537 : stream->pending_read_nblocks == 0 &&
1347 789386 : stream->per_buffer_data_size == 0)
1348 : {
1349 : /*
1350 : * The fast path spins on one buffer entry repeatedly instead of
1351 : * rotating through the whole queue and clearing the entries behind
1352 : * it. If the buffer it starts with happened to be forwarded between
1353 : * StartReadBuffers() calls and also wrapped around the circular queue
1354 : * partway through, then a copy also exists in the overflow zone, and
1355 : * it won't clear it out as the regular path would. Do that now, so
1356 : * it doesn't need code for that.
1357 : */
1358 236124 : if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1359 234435 : stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1360 : InvalidBuffer;
1361 :
1362 236124 : stream->fast_path = true;
1363 : }
1364 : #endif
1365 :
1366 2240133 : return buffer;
1367 : }
1368 :
1369 : /*
1370 : * Transitional support for code that would like to perform or skip reads
1371 : * itself, without using the stream. Returns, and consumes, the next block
1372 : * number that would be read by the stream's look-ahead algorithm, or
1373 : * InvalidBlockNumber if the end of the stream is reached. Also reports the
1374 : * strategy that would be used to read it.
1375 : */
1376 : BlockNumber
1377 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
1378 : {
1379 0 : *strategy = stream->ios[0].op.strategy;
1380 0 : return read_stream_get_block(stream, NULL);
1381 : }
1382 :
1383 : /*
1384 : * Temporarily stop consuming block numbers from the block number callback.
1385 : * If called inside the block number callback, its return value should be
1386 : * returned by the callback.
1387 : */
1388 : BlockNumber
1389 0 : read_stream_pause(ReadStream *stream)
1390 : {
1391 0 : stream->resume_readahead_distance = stream->readahead_distance;
1392 0 : stream->resume_combine_distance = stream->combine_distance;
1393 0 : stream->readahead_distance = 0;
1394 0 : stream->combine_distance = 0;
1395 0 : return InvalidBlockNumber;
1396 : }
1397 :
1398 : /*
1399 : * Resume looking ahead after the block number callback reported
1400 : * end-of-stream. This is useful for streams of self-referential blocks, after
1401 : * a buffer needed to be consumed and examined to find more block numbers.
1402 : */
1403 : void
1404 0 : read_stream_resume(ReadStream *stream)
1405 : {
1406 0 : stream->readahead_distance = stream->resume_readahead_distance;
1407 0 : stream->combine_distance = stream->resume_combine_distance;
1408 0 : }
1409 :
1410 : /*
1411 : * Reset a read stream by releasing any queued up buffers, allowing the stream
1412 : * to be used again for different blocks. This can be used to clear an
1413 : * end-of-stream condition and start again, or to throw away blocks that were
1414 : * speculatively read and read some different blocks instead.
1415 : */
1416 : void
1417 1556410 : read_stream_reset(ReadStream *stream)
1418 : {
1419 : int16 index;
1420 : Buffer buffer;
1421 :
1422 : /* Stop looking ahead. */
1423 1556410 : stream->readahead_distance = 0;
1424 1556410 : stream->combine_distance = 0;
1425 :
1426 : /* Forget buffered block number and fast path state. */
1427 1556410 : stream->buffered_blocknum = InvalidBlockNumber;
1428 1556410 : stream->fast_path = false;
1429 :
1430 : /* Unpin anything that wasn't consumed. */
1431 1695942 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1432 139532 : ReleaseBuffer(buffer);
1433 :
1434 : /* Unpin any unused forwarded buffers. */
1435 1556410 : index = stream->next_buffer_index;
1436 1556410 : while (index < stream->initialized_buffers &&
1437 216639 : (buffer = stream->buffers[index]) != InvalidBuffer)
1438 : {
1439 : Assert(stream->forwarded_buffers > 0);
1440 0 : stream->forwarded_buffers--;
1441 0 : ReleaseBuffer(buffer);
1442 :
1443 0 : stream->buffers[index] = InvalidBuffer;
1444 0 : if (index < stream->io_combine_limit - 1)
1445 0 : stream->buffers[stream->queue_size + index] = InvalidBuffer;
1446 :
1447 0 : if (++index == stream->queue_size)
1448 0 : index = 0;
1449 : }
1450 :
1451 : Assert(stream->forwarded_buffers == 0);
1452 : Assert(stream->pinned_buffers == 0);
1453 : Assert(stream->ios_in_progress == 0);
1454 :
1455 : /* Start off assuming data is cached. */
1456 1556410 : stream->readahead_distance = 1;
1457 1556410 : stream->combine_distance = 1;
1458 1556410 : stream->resume_readahead_distance = stream->readahead_distance;
1459 1556410 : stream->resume_combine_distance = stream->combine_distance;
1460 1556410 : stream->distance_decay_holdoff = 0;
1461 1556410 : }
1462 :
1463 : /*
1464 : * Release and free a read stream.
1465 : */
1466 : void
1467 691640 : read_stream_end(ReadStream *stream)
1468 : {
1469 691640 : read_stream_reset(stream);
1470 691640 : pfree(stream);
1471 691640 : }
|