Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * read_stream.c
4 : * Mechanism for accessing buffered relation data with look-ahead
5 : *
6 : * Code that needs to access relation data typically pins blocks one at a
7 : * time, often in a predictable order that might be sequential or data-driven.
8 : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : * because blocks that are not yet in the buffer pool require I/O operations
10 : * that are small and might stall waiting for storage. This mechanism looks
11 : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : * distance.
14 : *
15 : * A user-provided callback generates a stream of block numbers that is used
16 : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : * pending read. When that isn't possible, the existing pending read is sent
18 : * to StartReadBuffers() so that a new one can begin to form.
19 : *
20 : * The algorithm for controlling the look-ahead distance is based on recent
21 : * cache / miss history, as well as whether we need to wait for I/O completion
22 : * after a miss. When no I/O is necessary, there is no benefit in looking
23 : * ahead more than one block. This is the default initial assumption. When
24 : * blocks needing I/O are streamed, the combine distance is increased to
25 : * benefit from I/O combining and the read-ahead distance is increased
26 : * whenever we need to wait for I/O to try to benefit from increased I/O
27 : * concurrency. Both are reduced gradually when cached blocks are streamed.
28 : *
29 : * The main data structure is a circular queue of buffers of size
30 : * max_pinned_buffers plus some extra space for technical reasons, ready to be
31 : * returned by read_stream_next_buffer(). Each buffer also has an optional
32 : * variable sized object that is passed from the callback to the consumer of
33 : * buffers.
34 : *
35 : * Parallel to the queue of buffers, there is a circular queue of in-progress
36 : * I/Os that have been started with StartReadBuffers(), and for which
37 : * WaitReadBuffers() must be called before returning the buffer.
38 : *
39 : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
40 : * successive calls, then these data structures might appear as follows:
41 : *
42 : * buffers buf/data ios
43 : *
44 : * +----+ +-----+ +--------+
45 : * | | | | +----+ 42..44 | <- oldest_io_index
46 : * +----+ +-----+ | +--------+
47 : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
48 : * +----+ +-----+ | | +--------+
49 : * | 42 | | ? |<-+ | | | <- next_io_index
50 : * +----+ +-----+ | +--------+
51 : * | 43 | | ? | | | |
52 : * +----+ +-----+ | +--------+
53 : * | 44 | | ? | | | |
54 : * +----+ +-----+ | +--------+
55 : * | 60 | | ? |<---+
56 : * +----+ +-----+
57 : * next_buffer_index -> | | | |
58 : * +----+ +-----+
59 : *
60 : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
61 : * the client is block 10. Block 10 was a hit and has no associated I/O, but
62 : * the range 42..44 requires an I/O wait before its buffers are returned, as
63 : * does block 60.
64 : *
65 : *
66 : * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
67 : * Portions Copyright (c) 1994, Regents of the University of California
68 : *
69 : * IDENTIFICATION
70 : * src/backend/storage/aio/read_stream.c
71 : *
72 : *-------------------------------------------------------------------------
73 : */
74 : #include "postgres.h"
75 :
76 : #include "miscadmin.h"
77 : #include "storage/aio.h"
78 : #include "storage/fd.h"
79 : #include "storage/smgr.h"
80 : #include "storage/read_stream.h"
81 : #include "utils/memdebug.h"
82 : #include "utils/rel.h"
83 : #include "utils/spccache.h"
84 :
85 : typedef struct InProgressIO
86 : {
87 : int16 buffer_index;
88 : ReadBuffersOperation op;
89 : } InProgressIO;
90 :
91 : /*
92 : * State for managing a stream of reads.
93 : */
94 : struct ReadStream
95 : {
96 : int16 max_ios;
97 : int16 io_combine_limit;
98 : int16 ios_in_progress;
99 : int16 queue_size;
100 : int16 max_pinned_buffers;
101 : int16 forwarded_buffers;
102 : int16 pinned_buffers;
103 :
104 : /*
105 : * Limit of how far, in blocks, to look-ahead for IO combining and for
106 : * read-ahead.
107 : *
108 : * The limits for read-ahead and combining are handled separately to allow
109 : * for IO combining even in cases where the I/O subsystem can keep up at a
110 : * low read-ahead distance, as doing larger IOs is more efficient.
111 : *
112 : * Set to 0 when the end of the stream is reached.
113 : */
114 : int16 combine_distance;
115 : int16 readahead_distance;
116 : uint16 distance_decay_holdoff;
117 : int16 initialized_buffers;
118 : int16 resume_readahead_distance;
119 : int16 resume_combine_distance;
120 : int read_buffers_flags;
121 : bool sync_mode; /* using io_method=sync */
122 : bool batch_mode; /* READ_STREAM_USE_BATCHING */
123 : bool advice_enabled;
124 : bool temporary;
125 :
126 : /*
127 : * One-block buffer to support 'ungetting' a block number, to resolve flow
128 : * control problems when I/Os are split.
129 : */
130 : BlockNumber buffered_blocknum;
131 :
132 : /*
133 : * The callback that will tell us which block numbers to read, and an
134 : * opaque pointer that will be pass to it for its own purposes.
135 : */
136 : ReadStreamBlockNumberCB callback;
137 : void *callback_private_data;
138 :
139 : /* Next expected block, for detecting sequential access. */
140 : BlockNumber seq_blocknum;
141 : BlockNumber seq_until_processed;
142 :
143 : /* The read operation we are currently preparing. */
144 : BlockNumber pending_read_blocknum;
145 : int16 pending_read_nblocks;
146 :
147 : /* Space for buffers and optional per-buffer private data. */
148 : size_t per_buffer_data_size;
149 : void *per_buffer_data;
150 :
151 : /* Read operations that have been started but not waited for yet. */
152 : InProgressIO *ios;
153 : int16 oldest_io_index;
154 : int16 next_io_index;
155 :
156 : bool fast_path;
157 :
158 : /* Circular queue of buffers. */
159 : int16 oldest_buffer_index; /* Next pinned buffer to return */
160 : int16 next_buffer_index; /* Index of next buffer to pin */
161 : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
162 : };
163 :
164 : /*
165 : * Return a pointer to the per-buffer data by index.
166 : */
167 : static inline void *
168 4286408 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
169 : {
170 8572816 : return (char *) stream->per_buffer_data +
171 4286408 : stream->per_buffer_data_size * buffer_index;
172 : }
173 :
174 : /*
175 : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
176 : * blocks [current_blocknum, last_exclusive).
177 : */
178 : BlockNumber
179 423563 : block_range_read_stream_cb(ReadStream *stream,
180 : void *callback_private_data,
181 : void *per_buffer_data)
182 : {
183 423563 : BlockRangeReadStreamPrivate *p = callback_private_data;
184 :
185 423563 : if (p->current_blocknum < p->last_exclusive)
186 345991 : return p->current_blocknum++;
187 :
188 77572 : return InvalidBlockNumber;
189 : }
190 :
191 : /*
192 : * Ask the callback which block it would like us to read next, with a one block
193 : * buffer in front to allow read_stream_unget_block() to work.
194 : */
195 : static inline BlockNumber
196 6557922 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
197 : {
198 : BlockNumber blocknum;
199 :
200 6557922 : blocknum = stream->buffered_blocknum;
201 6557922 : if (blocknum != InvalidBlockNumber)
202 0 : stream->buffered_blocknum = InvalidBlockNumber;
203 : else
204 : {
205 : /*
206 : * Tell Valgrind that the per-buffer data is undefined. That replaces
207 : * the "noaccess" state that was set when the consumer moved past this
208 : * entry last time around the queue, and should also catch callbacks
209 : * that fail to initialize data that the buffer consumer later
210 : * accesses. On the first go around, it is undefined already.
211 : */
212 : VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
213 : stream->per_buffer_data_size);
214 6557922 : blocknum = stream->callback(stream,
215 : stream->callback_private_data,
216 : per_buffer_data);
217 : }
218 :
219 6557922 : return blocknum;
220 : }
221 :
222 : /*
223 : * In order to deal with buffer shortages and I/O limits after short reads, we
224 : * sometimes need to defer handling of a block we've already consumed from the
225 : * registered callback until later.
226 : */
227 : static inline void
228 0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
229 : {
230 : /* We shouldn't ever unget more than one block. */
231 : Assert(stream->buffered_blocknum == InvalidBlockNumber);
232 : Assert(blocknum != InvalidBlockNumber);
233 0 : stream->buffered_blocknum = blocknum;
234 0 : }
235 :
236 : /*
237 : * Start as much of the current pending read as we can. If we have to split it
238 : * because of the per-backend buffer limit, or the buffer manager decides to
239 : * split it, then the pending read is adjusted to hold the remaining portion.
240 : *
241 : * We can always start a read of at least size one if we have no progress yet.
242 : * Otherwise it's possible that we can't start a read at all because of a lack
243 : * of buffers, and then false is returned. Buffer shortages also reduce the
244 : * distance to a level that prevents look-ahead until buffers are released.
245 : */
246 : static bool
247 2109388 : read_stream_start_pending_read(ReadStream *stream)
248 : {
249 : bool need_wait;
250 : int requested_nblocks;
251 : int nblocks;
252 : int flags;
253 : int forwarded;
254 : int16 io_index;
255 : int16 overflow;
256 : int16 buffer_index;
257 : int buffer_limit;
258 :
259 : /* This should only be called with a pending read. */
260 : Assert(stream->pending_read_nblocks > 0);
261 : Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
262 :
263 : /* We had better not exceed the per-stream buffer limit with this read. */
264 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
265 : stream->max_pinned_buffers);
266 :
267 : #ifdef USE_ASSERT_CHECKING
268 : /* We had better not be overwriting an existing pinned buffer. */
269 : if (stream->pinned_buffers > 0)
270 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
271 : else
272 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
273 :
274 : /*
275 : * Pinned buffers forwarded by a preceding StartReadBuffers() call that
276 : * had to split the operation should match the leading blocks of this
277 : * following StartReadBuffers() call.
278 : */
279 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
280 : for (int i = 0; i < stream->forwarded_buffers; ++i)
281 : Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
282 : stream->pending_read_blocknum + i);
283 :
284 : /*
285 : * Check that we've cleared the queue/overflow entries corresponding to
286 : * the rest of the blocks covered by this read, unless it's the first go
287 : * around and we haven't even initialized them yet.
288 : */
289 : for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
290 : Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
291 : stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
292 : #endif
293 :
294 : /* Do we need to issue read-ahead advice? */
295 2109388 : flags = stream->read_buffers_flags;
296 2109388 : if (stream->advice_enabled)
297 : {
298 1711 : if (stream->pending_read_blocknum == stream->seq_blocknum)
299 : {
300 : /*
301 : * Sequential: Issue advice until the preadv() calls have caught
302 : * up with the first advice issued for this sequential region, and
303 : * then stay out of the way of the kernel's own read-ahead.
304 : */
305 29 : if (stream->seq_until_processed != InvalidBlockNumber)
306 1 : flags |= READ_BUFFERS_ISSUE_ADVICE;
307 : }
308 : else
309 : {
310 : /*
311 : * Random jump: Note the starting location of a new potential
312 : * sequential region and start issuing advice. Skip it this time
313 : * if the preadv() follows immediately, eg first block in stream.
314 : */
315 1682 : stream->seq_until_processed = stream->pending_read_blocknum;
316 1682 : if (stream->pinned_buffers > 0)
317 45 : flags |= READ_BUFFERS_ISSUE_ADVICE;
318 : }
319 : }
320 :
321 : /*
322 : * How many more buffers is this backend allowed?
323 : *
324 : * Forwarded buffers are already pinned and map to the leading blocks of
325 : * the pending read (the remaining portion of an earlier short read that
326 : * we're about to continue). They are not counted in pinned_buffers, but
327 : * they are counted as pins already held by this backend according to the
328 : * buffer manager, so they must be added to the limit it grants us.
329 : */
330 2109388 : if (stream->temporary)
331 16377 : buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
332 : else
333 2093011 : buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
334 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
335 :
336 2109388 : buffer_limit += stream->forwarded_buffers;
337 2109388 : buffer_limit = Min(buffer_limit, PG_INT16_MAX);
338 :
339 2109388 : if (buffer_limit == 0 && stream->pinned_buffers == 0)
340 740858 : buffer_limit = 1; /* guarantee progress */
341 :
342 : /* Does the per-backend limit affect this read? */
343 2109388 : nblocks = stream->pending_read_nblocks;
344 2109388 : if (buffer_limit < nblocks)
345 : {
346 : int16 new_distance;
347 :
348 : /* Shrink distance: no more look-ahead until buffers are released. */
349 1923 : new_distance = stream->pinned_buffers + buffer_limit;
350 1923 : if (stream->readahead_distance > new_distance)
351 335 : stream->readahead_distance = new_distance;
352 :
353 : /* Unless we have nothing to give the consumer, stop here. */
354 1923 : if (stream->pinned_buffers > 0)
355 55 : return false;
356 :
357 : /* A short read is required to make progress. */
358 1868 : nblocks = buffer_limit;
359 : }
360 :
361 : /*
362 : * We say how many blocks we want to read, but it may be smaller on return
363 : * if the buffer manager decides to shorten the read. Initialize buffers
364 : * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
365 : * and keep the original nblocks number so we can check for forwarded
366 : * buffers as output, below.
367 : */
368 2109333 : buffer_index = stream->next_buffer_index;
369 2109333 : io_index = stream->next_io_index;
370 3444010 : while (stream->initialized_buffers < buffer_index + nblocks)
371 1334677 : stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
372 2109333 : requested_nblocks = nblocks;
373 2109333 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
374 2109333 : &stream->buffers[buffer_index],
375 : stream->pending_read_blocknum,
376 : &nblocks,
377 : flags);
378 2109325 : stream->pinned_buffers += nblocks;
379 :
380 : /* Remember whether we need to wait before returning this buffer. */
381 2109325 : if (!need_wait)
382 : {
383 : /*
384 : * If there currently is no IO in progress, and we have not needed to
385 : * issue IO recently, decay the look-ahead distance. We detect if we
386 : * had to issue IO recently by having a decay holdoff that's set to
387 : * the max look-ahead distance whenever we need to do IO. This is
388 : * important to ensure we eventually reach a high enough distance to
389 : * perform IO asynchronously when starting out with a small look-ahead
390 : * distance.
391 : */
392 1420625 : if (stream->ios_in_progress == 0)
393 : {
394 1420153 : if (stream->distance_decay_holdoff > 0)
395 27080 : stream->distance_decay_holdoff--;
396 : else
397 : {
398 1393073 : if (stream->readahead_distance > 1)
399 17337 : stream->readahead_distance--;
400 :
401 : /*
402 : * For now we reduce the IO combine distance after
403 : * sufficiently many buffer hits. There is no clear
404 : * performance argument for doing so, but at the moment we
405 : * need to do so to make the entrance into fast_path work
406 : * correctly: We require combine_distance == 1 to enter
407 : * fast-path, as without that condition we would wrongly
408 : * re-enter fast-path when readahead_distance == 1 and
409 : * pinned_buffers == 1, as we would not yet have prepared
410 : * another IO in that situation.
411 : */
412 1393073 : if (stream->combine_distance > 1)
413 17341 : stream->combine_distance--;
414 : }
415 : }
416 : }
417 : else
418 : {
419 : /*
420 : * Remember to call WaitReadBuffers() before returning head buffer.
421 : * Look-ahead distance will be adjusted after waiting.
422 : */
423 688700 : stream->ios[io_index].buffer_index = buffer_index;
424 688700 : if (++stream->next_io_index == stream->max_ios)
425 29203 : stream->next_io_index = 0;
426 : Assert(stream->ios_in_progress < stream->max_ios);
427 688700 : stream->ios_in_progress++;
428 688700 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
429 : }
430 :
431 : /*
432 : * How many pins were acquired but forwarded to the next call? These need
433 : * to be passed to the next StartReadBuffers() call by leaving them
434 : * exactly where they are in the queue, or released if the stream ends
435 : * early. We need the number for accounting purposes, since they are not
436 : * counted in stream->pinned_buffers but we already hold them.
437 : */
438 2109325 : forwarded = 0;
439 2111566 : while (nblocks + forwarded < requested_nblocks &&
440 66687 : stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
441 2241 : forwarded++;
442 2109325 : stream->forwarded_buffers = forwarded;
443 :
444 : /*
445 : * We gave a contiguous range of buffer space to StartReadBuffers(), but
446 : * we want it to wrap around at queue_size. Copy overflowing buffers to
447 : * the front of the array where they'll be consumed, but also leave a copy
448 : * in the overflow zone which the I/O operation has a pointer to (it needs
449 : * a contiguous array). Both copies will be cleared when the buffers are
450 : * handed to the consumer.
451 : */
452 2109325 : overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
453 2109325 : if (overflow > 0)
454 : {
455 : Assert(overflow < stream->queue_size); /* can't overlap */
456 374 : memcpy(&stream->buffers[0],
457 374 : &stream->buffers[stream->queue_size],
458 : sizeof(stream->buffers[0]) * overflow);
459 : }
460 :
461 : /* Compute location of start of next read, without using % operator. */
462 2109325 : buffer_index += nblocks;
463 2109325 : if (buffer_index >= stream->queue_size)
464 347621 : buffer_index -= stream->queue_size;
465 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
466 2109325 : stream->next_buffer_index = buffer_index;
467 :
468 : /* Adjust the pending read to cover the remaining portion, if any. */
469 2109325 : stream->pending_read_blocknum += nblocks;
470 2109325 : stream->pending_read_nblocks -= nblocks;
471 :
472 2109325 : return true;
473 : }
474 :
475 : /*
476 : * Should we continue to perform look ahead? Looking ahead may allow us to
477 : * make the pending IO larger via IO combining or to issue more read-ahead.
478 : */
479 : static inline bool
480 6276195 : read_stream_should_look_ahead(ReadStream *stream)
481 : {
482 : /* If the callback has signaled end-of-stream, we're done */
483 6276195 : if (stream->readahead_distance == 0)
484 338058 : return false;
485 :
486 : /* never start more IOs than our cap */
487 5938137 : if (stream->ios_in_progress >= stream->max_ios)
488 0 : return false;
489 :
490 : /*
491 : * Allow looking further ahead if we are in the process of building a
492 : * larger IO, the IO is not yet big enough, and we don't yet have IO in
493 : * flight.
494 : *
495 : * We do so to allow building larger reads when readahead_distance is
496 : * small (e.g. because the I/O subsystem is keeping up or
497 : * effective_io_concurrency is small). That's a useful goal because larger
498 : * reads are more CPU efficient than smaller reads, even if the system is
499 : * not IO bound.
500 : *
501 : * The reason we do *not* do so when we already have a read prepared (i.e.
502 : * why we check for pinned_buffers == 0) is once we are actually reading
503 : * ahead, we don't need it:
504 : *
505 : * - We won't issue unnecessarily small reads as
506 : * read_stream_should_issue_now() will return false until the IO is
507 : * suitably sized. The issuance of the pending read will be delayed until
508 : * enough buffers have been consumed.
509 : *
510 : * - If we are not reading ahead aggressively enough, future
511 : * WaitReadBuffers() calls will return true, leading to readahead_distance
512 : * being increased. After that more full-sized IOs can be issued.
513 : *
514 : * Furthermore, if we did not have the pinned_buffers == 0 condition, we
515 : * might end up issuing I/O more aggressively than we need.
516 : *
517 : * Note that a return of true here can lead to exceeding the read-ahead
518 : * limit, but we won't exceed the buffer pin limit (because pinned_buffers
519 : * == 0 and combine_distance is capped by max_pinned_buffers).
520 : */
521 5938137 : if (stream->pending_read_nblocks > 0 &&
522 2615557 : stream->pinned_buffers == 0 &&
523 2473618 : stream->pending_read_nblocks < stream->combine_distance)
524 473970 : return true;
525 :
526 : /*
527 : * Don't start more read-ahead if that'd put us over the distance limit
528 : * for doing read-ahead. As stream->readahead_distance is capped by
529 : * max_pinned_buffers, this prevents us from looking ahead so far that it
530 : * would put us over the pin limit.
531 : */
532 5464167 : if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
533 2077822 : return false;
534 :
535 3386345 : return true;
536 : }
537 :
538 : /*
539 : * We don't start the pending read just because we've hit the distance limit,
540 : * preferring to give it another chance to grow to full io_combine_limit size
541 : * once more buffers have been consumed. But this is not desirable in all
542 : * situations - see below.
543 : */
544 : static inline bool
545 7363553 : read_stream_should_issue_now(ReadStream *stream)
546 : {
547 7363553 : int16 pending_read_nblocks = stream->pending_read_nblocks;
548 :
549 : /* there is no pending IO that could be issued */
550 7363553 : if (pending_read_nblocks == 0)
551 4880577 : return false;
552 :
553 : /* never start more IOs than our cap */
554 2482976 : if (stream->ios_in_progress >= stream->max_ios)
555 0 : return false;
556 :
557 : /*
558 : * If the callback has signaled end-of-stream, start the pending read
559 : * immediately. There is no further potential for IO combining.
560 : */
561 2482976 : if (stream->readahead_distance == 0)
562 104404 : return true;
563 :
564 : /*
565 : * If we've already reached combine_distance, there's no chance of growing
566 : * the read further.
567 : */
568 2378572 : if (pending_read_nblocks >= stream->combine_distance)
569 2004935 : return true;
570 :
571 : /*
572 : * If we currently have no reads in flight or prepared, issue the IO once
573 : * we are not looking ahead further. This ensures there's always at least
574 : * one IO prepared.
575 : */
576 373637 : if (stream->pinned_buffers == 0 &&
577 236985 : !read_stream_should_look_ahead(stream))
578 0 : return true;
579 :
580 373637 : return false;
581 : }
582 :
583 : static void
584 3740223 : read_stream_look_ahead(ReadStream *stream)
585 : {
586 : /*
587 : * Allow amortizing the cost of submitting IO over multiple IOs. This
588 : * requires that we don't do any operations that could lead to a deadlock
589 : * with staged-but-unsubmitted IO. The callback needs to opt-in to being
590 : * careful.
591 : */
592 3740223 : if (stream->batch_mode)
593 3219956 : pgaio_enter_batchmode();
594 :
595 6039210 : while (read_stream_should_look_ahead(stream))
596 : {
597 : BlockNumber blocknum;
598 : int16 buffer_index;
599 : void *per_buffer_data;
600 :
601 3623330 : if (read_stream_should_issue_now(stream))
602 : {
603 2019 : read_stream_start_pending_read(stream);
604 2019 : continue;
605 : }
606 :
607 : /*
608 : * See which block the callback wants next in the stream. We need to
609 : * compute the index of the Nth block of the pending read including
610 : * wrap-around, but we don't want to use the expensive % operator.
611 : */
612 3621311 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
613 3621311 : if (buffer_index >= stream->queue_size)
614 2398 : buffer_index -= stream->queue_size;
615 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
616 3621311 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
617 3621311 : blocknum = read_stream_get_block(stream, per_buffer_data);
618 3621311 : if (blocknum == InvalidBlockNumber)
619 : {
620 : /* End of stream. */
621 1324343 : stream->readahead_distance = 0;
622 1324343 : stream->combine_distance = 0;
623 1324343 : break;
624 : }
625 :
626 : /* Can we merge it with the pending read? */
627 2296968 : if (stream->pending_read_nblocks > 0 &&
628 254419 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
629 : {
630 254376 : stream->pending_read_nblocks++;
631 254376 : continue;
632 : }
633 :
634 : /* We have to start the pending read before we can build another. */
635 2042641 : while (stream->pending_read_nblocks > 0)
636 : {
637 49 : if (!read_stream_start_pending_read(stream) ||
638 49 : stream->ios_in_progress == stream->max_ios)
639 : {
640 : /* We've hit the buffer or I/O limit. Rewind and stop here. */
641 0 : read_stream_unget_block(stream, blocknum);
642 0 : if (stream->batch_mode)
643 0 : pgaio_exit_batchmode();
644 0 : return;
645 : }
646 : }
647 :
648 : /* This is the start of a new pending read. */
649 2042592 : stream->pending_read_blocknum = blocknum;
650 2042592 : stream->pending_read_nblocks = 1;
651 : }
652 :
653 : /*
654 : * Check if the pending read should be issued now, or if we should give it
655 : * another chance to grow to the full size.
656 : *
657 : * Note that the pending read can exceed the distance goal, if the latter
658 : * was reduced after hitting the per-backend buffer limit.
659 : */
660 3740223 : if (read_stream_should_issue_now(stream))
661 2107320 : read_stream_start_pending_read(stream);
662 :
663 : /*
664 : * There should always be something pinned when we leave this function,
665 : * whether started by this call or not, unless we've hit the end of the
666 : * stream. In the worst case we can always make progress one buffer at a
667 : * time.
668 : */
669 : Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
670 :
671 3740215 : if (stream->batch_mode)
672 3219948 : pgaio_exit_batchmode();
673 : }
674 :
675 : /*
676 : * Create a new read stream object that can be used to perform the equivalent
677 : * of a series of ReadBuffer() calls for one fork of one relation.
678 : * Internally, it generates larger vectored reads where possible by looking
679 : * ahead. The callback should return block numbers or InvalidBlockNumber to
680 : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
681 : * write extra data for each block into the space provided to it. It will
682 : * also receive callback_private_data for its own purposes.
683 : */
684 : static ReadStream *
685 684228 : read_stream_begin_impl(int flags,
686 : BufferAccessStrategy strategy,
687 : Relation rel,
688 : SMgrRelation smgr,
689 : char persistence,
690 : ForkNumber forknum,
691 : ReadStreamBlockNumberCB callback,
692 : void *callback_private_data,
693 : size_t per_buffer_data_size)
694 : {
695 : ReadStream *stream;
696 : size_t size;
697 : int16 queue_size;
698 : int16 queue_overflow;
699 : int max_ios;
700 : int strategy_pin_limit;
701 : uint32 max_pinned_buffers;
702 : uint32 max_possible_buffer_limit;
703 : Oid tablespace_id;
704 :
705 : /*
706 : * Decide how many I/Os we will allow to run at the same time. That
707 : * currently means advice to the kernel to tell it that we will soon read.
708 : * This number also affects how far we look ahead for opportunities to
709 : * start more I/Os.
710 : */
711 684228 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
712 684228 : if (!OidIsValid(MyDatabaseId) ||
713 798847 : (rel && IsCatalogRelation(rel)) ||
714 197099 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
715 : {
716 : /*
717 : * Avoid circularity while trying to look up tablespace settings or
718 : * before spccache.c is ready.
719 : */
720 554304 : max_ios = effective_io_concurrency;
721 : }
722 129924 : else if (flags & READ_STREAM_MAINTENANCE)
723 17147 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
724 : else
725 112777 : max_ios = get_tablespace_io_concurrency(tablespace_id);
726 :
727 : /* Cap to INT16_MAX to avoid overflowing below */
728 684228 : max_ios = Min(max_ios, PG_INT16_MAX);
729 :
730 : /*
731 : * If starting a multi-block I/O near the end of the queue, we might
732 : * temporarily need extra space for overflowing buffers before they are
733 : * moved to regular circular position. This is the maximum extra space we
734 : * could need.
735 : */
736 684228 : queue_overflow = io_combine_limit - 1;
737 :
738 : /*
739 : * Choose the maximum number of buffers we're prepared to pin. We try to
740 : * pin fewer if we can, though. We add one so that we can make progress
741 : * even if max_ios is set to 0 (see also further down). For max_ios > 0,
742 : * this also allows an extra full I/O's worth of buffers: after an I/O
743 : * finishes we don't want to have to wait for its buffers to be consumed
744 : * before starting a new one.
745 : *
746 : * Be careful not to allow int16 to overflow. That is possible with the
747 : * current GUC range limits, so this is an artificial limit of ~32k
748 : * buffers and we'd need to adjust the types to exceed that. We also have
749 : * to allow for the spare entry and the overflow space.
750 : */
751 684228 : max_pinned_buffers = (max_ios + 1) * io_combine_limit;
752 684228 : max_pinned_buffers = Min(max_pinned_buffers,
753 : PG_INT16_MAX - queue_overflow - 1);
754 :
755 : /* Give the strategy a chance to limit the number of buffers we pin. */
756 684228 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
757 684228 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
758 :
759 : /*
760 : * Also limit our queue to the maximum number of pins we could ever be
761 : * allowed to acquire according to the buffer manager. We may not really
762 : * be able to use them all due to other pins held by this backend, but
763 : * we'll check that later in read_stream_start_pending_read().
764 : */
765 684228 : if (SmgrIsTemp(smgr))
766 9250 : max_possible_buffer_limit = GetLocalPinLimit();
767 : else
768 674978 : max_possible_buffer_limit = GetPinLimit();
769 684228 : max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
770 :
771 : /*
772 : * The limit might be zero on a system configured with too few buffers for
773 : * the number of connections. We need at least one to make progress.
774 : */
775 684228 : max_pinned_buffers = Max(1, max_pinned_buffers);
776 :
777 : /*
778 : * We need one extra entry for buffers and per-buffer data, because users
779 : * of per-buffer data have access to the object until the next call to
780 : * read_stream_next_buffer(), so we need a gap between the head and tail
781 : * of the queue so that we don't clobber it.
782 : */
783 684228 : queue_size = max_pinned_buffers + 1;
784 :
785 : /*
786 : * Allocate the object, the buffers, the ios and per_buffer_data space in
787 : * one big chunk. Though we have queue_size buffers, we want to be able
788 : * to assume that all the buffers for a single read are contiguous (i.e.
789 : * don't wrap around halfway through), so we allow temporary overflows of
790 : * up to the maximum possible overflow size.
791 : */
792 684228 : size = offsetof(ReadStream, buffers);
793 684228 : size += sizeof(Buffer) * (queue_size + queue_overflow);
794 684228 : size += sizeof(InProgressIO) * Max(1, max_ios);
795 684228 : size += per_buffer_data_size * queue_size;
796 684228 : size += MAXIMUM_ALIGNOF * 2;
797 684228 : stream = (ReadStream *) palloc(size);
798 684228 : memset(stream, 0, offsetof(ReadStream, buffers));
799 684228 : stream->ios = (InProgressIO *)
800 684228 : MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
801 684228 : if (per_buffer_data_size > 0)
802 130876 : stream->per_buffer_data = (void *)
803 130876 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
804 :
805 684228 : stream->sync_mode = io_method == IOMETHOD_SYNC;
806 684228 : stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
807 :
808 : #ifdef USE_PREFETCH
809 :
810 : /*
811 : * Read-ahead advice simulating asynchronous I/O with synchronous calls.
812 : * Issue advice only if AIO is not used, direct I/O isn't enabled, the
813 : * caller hasn't promised sequential access (overriding our detection
814 : * heuristics), and max_ios hasn't been set to zero.
815 : */
816 684228 : if (stream->sync_mode &&
817 3161 : (io_direct_flags & IO_DIRECT_DATA) == 0 &&
818 3161 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
819 : max_ios > 0)
820 738 : stream->advice_enabled = true;
821 : #endif
822 :
823 : /*
824 : * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
825 : * we still need to allocate space to combine and run one I/O. Bump it up
826 : * to one, and remember to ask for synchronous I/O only.
827 : */
828 684228 : if (max_ios == 0)
829 : {
830 7 : max_ios = 1;
831 7 : stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
832 : }
833 :
834 : /*
835 : * Capture stable values for these two GUC-derived numbers for the
836 : * lifetime of this stream, so we don't have to worry about the GUCs
837 : * changing underneath us beyond this point.
838 : */
839 684228 : stream->max_ios = max_ios;
840 684228 : stream->io_combine_limit = io_combine_limit;
841 :
842 684228 : stream->per_buffer_data_size = per_buffer_data_size;
843 684228 : stream->max_pinned_buffers = max_pinned_buffers;
844 684228 : stream->queue_size = queue_size;
845 684228 : stream->callback = callback;
846 684228 : stream->callback_private_data = callback_private_data;
847 684228 : stream->buffered_blocknum = InvalidBlockNumber;
848 684228 : stream->seq_blocknum = InvalidBlockNumber;
849 684228 : stream->seq_until_processed = InvalidBlockNumber;
850 684228 : stream->temporary = SmgrIsTemp(smgr);
851 684228 : stream->distance_decay_holdoff = 0;
852 :
853 : /*
854 : * Skip the initial ramp-up phase if the caller says we're going to be
855 : * reading the whole relation. This way we start out assuming we'll be
856 : * doing full io_combine_limit sized reads.
857 : */
858 684228 : if (flags & READ_STREAM_FULL)
859 : {
860 78516 : stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
861 78516 : stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
862 : }
863 : else
864 : {
865 605712 : stream->readahead_distance = 1;
866 605712 : stream->combine_distance = 1;
867 : }
868 684228 : stream->resume_readahead_distance = stream->readahead_distance;
869 684228 : stream->resume_combine_distance = stream->combine_distance;
870 :
871 : /*
872 : * Since we always access the same relation, we can initialize parts of
873 : * the ReadBuffersOperation objects and leave them that way, to avoid
874 : * wasting CPU cycles writing to them for each read.
875 : */
876 11660467 : for (int i = 0; i < max_ios; ++i)
877 : {
878 10976239 : stream->ios[i].op.rel = rel;
879 10976239 : stream->ios[i].op.smgr = smgr;
880 10976239 : stream->ios[i].op.persistence = persistence;
881 10976239 : stream->ios[i].op.forknum = forknum;
882 10976239 : stream->ios[i].op.strategy = strategy;
883 : }
884 :
885 684228 : return stream;
886 : }
887 :
888 : /*
889 : * Create a new read stream for reading a relation.
890 : * See read_stream_begin_impl() for the detailed explanation.
891 : */
892 : ReadStream *
893 612456 : read_stream_begin_relation(int flags,
894 : BufferAccessStrategy strategy,
895 : Relation rel,
896 : ForkNumber forknum,
897 : ReadStreamBlockNumberCB callback,
898 : void *callback_private_data,
899 : size_t per_buffer_data_size)
900 : {
901 612456 : return read_stream_begin_impl(flags,
902 : strategy,
903 : rel,
904 : RelationGetSmgr(rel),
905 612456 : rel->rd_rel->relpersistence,
906 : forknum,
907 : callback,
908 : callback_private_data,
909 : per_buffer_data_size);
910 : }
911 :
912 : /*
913 : * Create a new read stream for reading a SMgr relation.
914 : * See read_stream_begin_impl() for the detailed explanation.
915 : */
916 : ReadStream *
917 71772 : read_stream_begin_smgr_relation(int flags,
918 : BufferAccessStrategy strategy,
919 : SMgrRelation smgr,
920 : char smgr_persistence,
921 : ForkNumber forknum,
922 : ReadStreamBlockNumberCB callback,
923 : void *callback_private_data,
924 : size_t per_buffer_data_size)
925 : {
926 71772 : return read_stream_begin_impl(flags,
927 : strategy,
928 : NULL,
929 : smgr,
930 : smgr_persistence,
931 : forknum,
932 : callback,
933 : callback_private_data,
934 : per_buffer_data_size);
935 : }
936 :
937 : /*
938 : * Pull one pinned buffer out of a stream. Each call returns successive
939 : * blocks in the order specified by the callback. If per_buffer_data_size was
940 : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
941 : * per-buffer data that the callback had a chance to populate, which remains
942 : * valid until the next call to read_stream_next_buffer(). When the stream
943 : * runs out of data, InvalidBuffer is returned. The caller may decide to end
944 : * the stream early at any time by calling read_stream_end().
945 : */
946 : Buffer
947 7958465 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
948 : {
949 : Buffer buffer;
950 : int16 oldest_buffer_index;
951 :
952 : #ifndef READ_STREAM_DISABLE_FAST_PATH
953 :
954 : /*
955 : * A fast path for all-cached scans. This is the same as the usual
956 : * algorithm, but it is specialized for no I/O and no per-buffer data, so
957 : * we can skip the queue management code, stay in the same buffer slot and
958 : * use singular StartReadBuffer().
959 : */
960 7958465 : if (likely(stream->fast_path))
961 : {
962 : BlockNumber next_blocknum;
963 :
964 : /* Fast path assumptions. */
965 : Assert(stream->ios_in_progress == 0);
966 : Assert(stream->forwarded_buffers == 0);
967 : Assert(stream->pinned_buffers == 1);
968 : Assert(stream->readahead_distance == 1);
969 : Assert(stream->combine_distance == 1);
970 : Assert(stream->pending_read_nblocks == 0);
971 : Assert(stream->per_buffer_data_size == 0);
972 : Assert(stream->initialized_buffers > stream->oldest_buffer_index);
973 :
974 : /* We're going to return the buffer we pinned last time. */
975 2936611 : oldest_buffer_index = stream->oldest_buffer_index;
976 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
977 : stream->next_buffer_index);
978 2936611 : buffer = stream->buffers[oldest_buffer_index];
979 : Assert(buffer != InvalidBuffer);
980 :
981 : /* Choose the next block to pin. */
982 2936611 : next_blocknum = read_stream_get_block(stream, NULL);
983 :
984 2936611 : if (likely(next_blocknum != InvalidBlockNumber))
985 : {
986 2829005 : int flags = stream->read_buffers_flags;
987 :
988 2829005 : if (stream->advice_enabled)
989 552 : flags |= READ_BUFFERS_ISSUE_ADVICE;
990 :
991 : /*
992 : * While in fast-path, execute any IO that we might encounter
993 : * synchronously. Because we are, right now, only looking one
994 : * block ahead, dispatching any occasional IO to workers would
995 : * have the overhead of dispatching to workers, without any
996 : * realistic chance of the IO completing before we need it. We
997 : * will switch to non-synchronous IO after this.
998 : *
999 : * Arguably we should do so only for worker, as there's far less
1000 : * dispatch overhead with io_uring. However, tests so far have not
1001 : * shown a clear downside and additional io_method awareness here
1002 : * seems not great from an abstraction POV.
1003 : */
1004 2829005 : flags |= READ_BUFFERS_SYNCHRONOUSLY;
1005 :
1006 : /*
1007 : * Pin a buffer for the next call. Same buffer entry, and
1008 : * arbitrary I/O entry (they're all free). We don't have to
1009 : * adjust pinned_buffers because we're transferring one to caller
1010 : * but pinning one more.
1011 : *
1012 : * In the fast path we don't need to check the pin limit. We're
1013 : * always allowed at least one pin so that progress can be made,
1014 : * and that's all we need here. Although two pins are momentarily
1015 : * held at the same time, the model used here is that the stream
1016 : * holds only one, and the other now belongs to the caller.
1017 : */
1018 2829005 : if (likely(!StartReadBuffer(&stream->ios[0].op,
1019 : &stream->buffers[oldest_buffer_index],
1020 : next_blocknum,
1021 : flags)))
1022 : {
1023 : /* Fast return. */
1024 2812679 : return buffer;
1025 : }
1026 :
1027 : /* Next call must wait for I/O for the newly pinned buffer. */
1028 16326 : stream->oldest_io_index = 0;
1029 16326 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
1030 16326 : stream->ios_in_progress = 1;
1031 16326 : stream->ios[0].buffer_index = oldest_buffer_index;
1032 16326 : stream->seq_blocknum = next_blocknum + 1;
1033 :
1034 : /*
1035 : * XXX: It might be worth triggering additional read-ahead here,
1036 : * to avoid having to effectively do another synchronous IO for
1037 : * the next block (if it were also a miss).
1038 : */
1039 : }
1040 : else
1041 : {
1042 : /* No more blocks, end of stream. */
1043 107606 : stream->readahead_distance = 0;
1044 107606 : stream->combine_distance = 0;
1045 107606 : stream->oldest_buffer_index = stream->next_buffer_index;
1046 107606 : stream->pinned_buffers = 0;
1047 107606 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
1048 : }
1049 :
1050 123932 : stream->fast_path = false;
1051 123932 : return buffer;
1052 : }
1053 : #endif
1054 :
1055 5021854 : if (unlikely(stream->pinned_buffers == 0))
1056 : {
1057 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
1058 :
1059 : /* End of stream reached? */
1060 3538913 : if (stream->readahead_distance == 0)
1061 1986840 : return InvalidBuffer;
1062 :
1063 : /*
1064 : * The usual order of operations is that we look ahead at the bottom
1065 : * of this function after potentially finishing an I/O and making
1066 : * space for more, but if we're just starting up we'll need to crank
1067 : * the handle to get started.
1068 : */
1069 1552073 : read_stream_look_ahead(stream);
1070 :
1071 : /* End of stream reached? */
1072 1552073 : if (stream->pinned_buffers == 0)
1073 : {
1074 : Assert(stream->readahead_distance == 0);
1075 846842 : return InvalidBuffer;
1076 : }
1077 : }
1078 :
1079 : /* Grab the oldest pinned buffer and associated per-buffer data. */
1080 : Assert(stream->pinned_buffers > 0);
1081 2188172 : oldest_buffer_index = stream->oldest_buffer_index;
1082 : Assert(oldest_buffer_index >= 0 &&
1083 : oldest_buffer_index < stream->queue_size);
1084 2188172 : buffer = stream->buffers[oldest_buffer_index];
1085 2188172 : if (per_buffer_data)
1086 665097 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
1087 :
1088 : Assert(BufferIsValid(buffer));
1089 :
1090 : /* Do we have to wait for an associated I/O first? */
1091 2188172 : if (stream->ios_in_progress > 0 &&
1092 766516 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
1093 : {
1094 704630 : int16 io_index = stream->oldest_io_index;
1095 : bool needed_wait;
1096 :
1097 : /* Sanity check that we still agree on the buffers. */
1098 : Assert(stream->ios[io_index].op.buffers ==
1099 : &stream->buffers[oldest_buffer_index]);
1100 :
1101 704630 : needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
1102 :
1103 : Assert(stream->ios_in_progress > 0);
1104 704608 : stream->ios_in_progress--;
1105 704608 : if (++stream->oldest_io_index == stream->max_ios)
1106 29203 : stream->oldest_io_index = 0;
1107 :
1108 : /*
1109 : * If the IO was executed synchronously, we will never see
1110 : * WaitReadBuffers() block. Treat it as if it did block. This is
1111 : * particularly crucial when effective_io_concurrency=0 is used, as
1112 : * all IO will be synchronous. Without treating synchronous IO as
1113 : * having waited, we'd never allow the distance to get large enough to
1114 : * allow for IO combining, resulting in bad performance.
1115 : */
1116 704608 : if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY)
1117 16818 : needed_wait = true;
1118 :
1119 : /*
1120 : * Have the read-ahead distance ramp up rapidly after we needed to
1121 : * wait for IO. We only increase the read-ahead-distance when we
1122 : * needed to wait, to avoid increasing the distance further than
1123 : * necessary, as looking ahead too far can be costly, both due to the
1124 : * cost of unnecessarily pinning many buffers and due to doing IOs
1125 : * that may never be consumed if the stream is ended/reset before
1126 : * completion.
1127 : *
1128 : * If we did not need to wait, the current distance was evidently
1129 : * sufficient.
1130 : *
1131 : * NB: Must not increase the distance if we already reached the end of
1132 : * the stream, as stream->readahead_distance == 0 is used to keep
1133 : * track of having reached the end.
1134 : */
1135 704608 : if (stream->readahead_distance > 0 && needed_wait)
1136 : {
1137 : /* wider temporary value, due to overflow risk */
1138 : int32 readahead_distance;
1139 :
1140 313390 : readahead_distance = stream->readahead_distance * 2;
1141 313390 : readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
1142 313390 : stream->readahead_distance = readahead_distance;
1143 : }
1144 :
1145 : /*
1146 : * As we needed IO, prevent distances from being reduced within our
1147 : * maximum look-ahead window. This avoids collapsing distances too
1148 : * quickly in workloads where most of the required blocks are cached,
1149 : * but where the remaining IOs are a sufficient enough factor to cause
1150 : * a substantial slowdown if executed synchronously.
1151 : *
1152 : * There are valid arguments for preventing decay for max_ios or for
1153 : * max_pinned_buffers. But the argument for max_pinned_buffers seems
1154 : * clearer - if we can't see any misses within the maximum look-ahead
1155 : * distance, we can't do any useful read-ahead.
1156 : */
1157 704608 : stream->distance_decay_holdoff = stream->max_pinned_buffers;
1158 :
1159 : /*
1160 : * Whether we needed to wait or not, allow for more IO combining if we
1161 : * needed to do IO. The reason to do so independent of needing to wait
1162 : * is that when the data is resident in the kernel page cache, IO
1163 : * combining reduces the syscall / dispatch overhead, making it
1164 : * worthwhile regardless of needing to wait.
1165 : *
1166 : * It is also important with io_uring as it will never signal the need
1167 : * to wait for reads if all the data is in the page cache. There are
1168 : * heuristics to deal with that in method_io_uring.c, but they only
1169 : * work when the IO gets large enough.
1170 : */
1171 704608 : if (stream->combine_distance > 0 &&
1172 653104 : stream->combine_distance < stream->io_combine_limit)
1173 : {
1174 : /* wider temporary value, due to overflow risk */
1175 : int32 combine_distance;
1176 :
1177 644447 : combine_distance = stream->combine_distance * 2;
1178 644447 : combine_distance = Min(combine_distance, stream->io_combine_limit);
1179 644447 : combine_distance = Min(combine_distance, stream->max_pinned_buffers);
1180 644447 : stream->combine_distance = combine_distance;
1181 : }
1182 :
1183 : /*
1184 : * If we've reached the first block of a sequential region we're
1185 : * issuing advice for, cancel that until the next jump. The kernel
1186 : * will see the sequential preadv() pattern starting here.
1187 : */
1188 704608 : if (stream->advice_enabled &&
1189 318 : stream->ios[io_index].op.blocknum == stream->seq_until_processed)
1190 275 : stream->seq_until_processed = InvalidBlockNumber;
1191 : }
1192 :
1193 : /*
1194 : * We must zap this queue entry, or else it would appear as a forwarded
1195 : * buffer. If it's potentially in the overflow zone (ie from a
1196 : * multi-block I/O that wrapped around the queue), also zap the copy.
1197 : */
1198 2188150 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
1199 2188150 : if (oldest_buffer_index < stream->io_combine_limit - 1)
1200 1660149 : stream->buffers[stream->queue_size + oldest_buffer_index] =
1201 : InvalidBuffer;
1202 :
1203 : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
1204 :
1205 : /*
1206 : * The caller will get access to the per-buffer data, until the next call.
1207 : * We wipe the one before, which is never occupied because queue_size
1208 : * allowed one extra element. This will hopefully trip up client code
1209 : * that is holding a dangling pointer to it.
1210 : */
1211 : if (stream->per_buffer_data)
1212 : {
1213 : void *per_buffer_data;
1214 :
1215 : per_buffer_data = get_per_buffer_data(stream,
1216 : oldest_buffer_index == 0 ?
1217 : stream->queue_size - 1 :
1218 : oldest_buffer_index - 1);
1219 :
1220 : #if defined(CLOBBER_FREED_MEMORY)
1221 : /* This also tells Valgrind the memory is "noaccess". */
1222 : wipe_mem(per_buffer_data, stream->per_buffer_data_size);
1223 : #elif defined(USE_VALGRIND)
1224 : /* Tell it ourselves. */
1225 : VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
1226 : stream->per_buffer_data_size);
1227 : #endif
1228 : }
1229 : #endif
1230 :
1231 : /* Pin transferred to caller. */
1232 : Assert(stream->pinned_buffers > 0);
1233 2188150 : stream->pinned_buffers--;
1234 :
1235 : /* Advance oldest buffer, with wrap-around. */
1236 2188150 : stream->oldest_buffer_index++;
1237 2188150 : if (stream->oldest_buffer_index == stream->queue_size)
1238 339933 : stream->oldest_buffer_index = 0;
1239 :
1240 : /* Prepare for the next call. */
1241 2188150 : read_stream_look_ahead(stream);
1242 :
1243 : #ifndef READ_STREAM_DISABLE_FAST_PATH
1244 : /* See if we can take the fast path for all-cached scans next time. */
1245 2188142 : if (stream->ios_in_progress == 0 &&
1246 1521209 : stream->forwarded_buffers == 0 &&
1247 1517970 : stream->pinned_buffers == 1 &&
1248 826187 : stream->readahead_distance == 1 &&
1249 741195 : stream->combine_distance == 1 &&
1250 738357 : stream->pending_read_nblocks == 0 &&
1251 737213 : stream->per_buffer_data_size == 0)
1252 : {
1253 : /*
1254 : * The fast path spins on one buffer entry repeatedly instead of
1255 : * rotating through the whole queue and clearing the entries behind
1256 : * it. If the buffer it starts with happened to be forwarded between
1257 : * StartReadBuffers() calls and also wrapped around the circular queue
1258 : * partway through, then a copy also exists in the overflow zone, and
1259 : * it won't clear it out as the regular path would. Do that now, so
1260 : * it doesn't need code for that.
1261 : */
1262 236346 : if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1263 234658 : stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1264 : InvalidBuffer;
1265 :
1266 236346 : stream->fast_path = true;
1267 : }
1268 : #endif
1269 :
1270 2188142 : return buffer;
1271 : }
1272 :
1273 : /*
1274 : * Transitional support for code that would like to perform or skip reads
1275 : * itself, without using the stream. Returns, and consumes, the next block
1276 : * number that would be read by the stream's look-ahead algorithm, or
1277 : * InvalidBlockNumber if the end of the stream is reached. Also reports the
1278 : * strategy that would be used to read it.
1279 : */
1280 : BlockNumber
1281 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
1282 : {
1283 0 : *strategy = stream->ios[0].op.strategy;
1284 0 : return read_stream_get_block(stream, NULL);
1285 : }
1286 :
1287 : /*
1288 : * Temporarily stop consuming block numbers from the block number callback.
1289 : * If called inside the block number callback, its return value should be
1290 : * returned by the callback.
1291 : */
1292 : BlockNumber
1293 0 : read_stream_pause(ReadStream *stream)
1294 : {
1295 0 : stream->resume_readahead_distance = stream->readahead_distance;
1296 0 : stream->resume_combine_distance = stream->combine_distance;
1297 0 : stream->readahead_distance = 0;
1298 0 : stream->combine_distance = 0;
1299 0 : return InvalidBlockNumber;
1300 : }
1301 :
1302 : /*
1303 : * Resume looking ahead after the block number callback reported
1304 : * end-of-stream. This is useful for streams of self-referential blocks, after
1305 : * a buffer needed to be consumed and examined to find more block numbers.
1306 : */
1307 : void
1308 0 : read_stream_resume(ReadStream *stream)
1309 : {
1310 0 : stream->readahead_distance = stream->resume_readahead_distance;
1311 0 : stream->combine_distance = stream->resume_combine_distance;
1312 0 : }
1313 :
1314 : /*
1315 : * Reset a read stream by releasing any queued up buffers, allowing the stream
1316 : * to be used again for different blocks. This can be used to clear an
1317 : * end-of-stream condition and start again, or to throw away blocks that were
1318 : * speculatively read and read some different blocks instead.
1319 : */
1320 : void
1321 1553562 : read_stream_reset(ReadStream *stream)
1322 : {
1323 : int16 index;
1324 : Buffer buffer;
1325 :
1326 : /* Stop looking ahead. */
1327 1553562 : stream->readahead_distance = 0;
1328 1553562 : stream->combine_distance = 0;
1329 :
1330 : /* Forget buffered block number and fast path state. */
1331 1553562 : stream->buffered_blocknum = InvalidBlockNumber;
1332 1553562 : stream->fast_path = false;
1333 :
1334 : /* Unpin anything that wasn't consumed. */
1335 1693112 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1336 139550 : ReleaseBuffer(buffer);
1337 :
1338 : /* Unpin any unused forwarded buffers. */
1339 1553562 : index = stream->next_buffer_index;
1340 1553562 : while (index < stream->initialized_buffers &&
1341 217911 : (buffer = stream->buffers[index]) != InvalidBuffer)
1342 : {
1343 : Assert(stream->forwarded_buffers > 0);
1344 0 : stream->forwarded_buffers--;
1345 0 : ReleaseBuffer(buffer);
1346 :
1347 0 : stream->buffers[index] = InvalidBuffer;
1348 0 : if (index < stream->io_combine_limit - 1)
1349 0 : stream->buffers[stream->queue_size + index] = InvalidBuffer;
1350 :
1351 0 : if (++index == stream->queue_size)
1352 0 : index = 0;
1353 : }
1354 :
1355 : Assert(stream->forwarded_buffers == 0);
1356 : Assert(stream->pinned_buffers == 0);
1357 : Assert(stream->ios_in_progress == 0);
1358 :
1359 : /* Start off assuming data is cached. */
1360 1553562 : stream->readahead_distance = 1;
1361 1553562 : stream->combine_distance = 1;
1362 1553562 : stream->resume_readahead_distance = stream->readahead_distance;
1363 1553562 : stream->resume_combine_distance = stream->combine_distance;
1364 1553562 : stream->distance_decay_holdoff = 0;
1365 1553562 : }
1366 :
1367 : /*
1368 : * Release and free a read stream.
1369 : */
1370 : void
1371 681019 : read_stream_end(ReadStream *stream)
1372 : {
1373 681019 : read_stream_reset(stream);
1374 681019 : pfree(stream);
1375 681019 : }
|