Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * read_stream.c
4 : * Mechanism for accessing buffered relation data with look-ahead
5 : *
6 : * Code that needs to access relation data typically pins blocks one at a
7 : * time, often in a predictable order that might be sequential or data-driven.
8 : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : * because blocks that are not yet in the buffer pool require I/O operations
10 : * that are small and might stall waiting for storage. This mechanism looks
11 : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : * distance.
14 : *
15 : * A user-provided callback generates a stream of block numbers that is used
16 : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : * pending read. When that isn't possible, the existing pending read is sent
18 : * to StartReadBuffers() so that a new one can begin to form.
19 : *
20 : * The algorithm for controlling the look-ahead distance is based on recent
21 : * cache hit and miss history. When no I/O is necessary, there is no benefit
22 : * in looking ahead more than one block. This is the default initial
23 : * assumption, but when blocks needing I/O are streamed, the distance is
24 : * increased rapidly to try to benefit from I/O combining and concurrency. It
25 : * is reduced gradually when cached blocks are streamed.
26 : *
27 : * The main data structure is a circular queue of buffers of size
28 : * max_pinned_buffers plus some extra space for technical reasons, ready to be
29 : * returned by read_stream_next_buffer(). Each buffer also has an optional
30 : * variable sized object that is passed from the callback to the consumer of
31 : * buffers.
32 : *
33 : * Parallel to the queue of buffers, there is a circular queue of in-progress
34 : * I/Os that have been started with StartReadBuffers(), and for which
35 : * WaitReadBuffers() must be called before returning the buffer.
36 : *
37 : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
38 : * successive calls, then these data structures might appear as follows:
39 : *
40 : * buffers buf/data ios
41 : *
42 : * +----+ +-----+ +--------+
43 : * | | | | +----+ 42..44 | <- oldest_io_index
44 : * +----+ +-----+ | +--------+
45 : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
46 : * +----+ +-----+ | | +--------+
47 : * | 42 | | ? |<-+ | | | <- next_io_index
48 : * +----+ +-----+ | +--------+
49 : * | 43 | | ? | | | |
50 : * +----+ +-----+ | +--------+
51 : * | 44 | | ? | | | |
52 : * +----+ +-----+ | +--------+
53 : * | 60 | | ? |<---+
54 : * +----+ +-----+
55 : * next_buffer_index -> | | | |
56 : * +----+ +-----+
57 : *
58 : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
59 : * the client is block 10. Block 10 was a hit and has no associated I/O, but
60 : * the range 42..44 requires an I/O wait before its buffers are returned, as
61 : * does block 60.
62 : *
63 : *
64 : * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
65 : * Portions Copyright (c) 1994, Regents of the University of California
66 : *
67 : * IDENTIFICATION
68 : * src/backend/storage/aio/read_stream.c
69 : *
70 : *-------------------------------------------------------------------------
71 : */
72 : #include "postgres.h"
73 :
74 : #include "miscadmin.h"
75 : #include "storage/aio.h"
76 : #include "storage/fd.h"
77 : #include "storage/smgr.h"
78 : #include "storage/read_stream.h"
79 : #include "utils/memdebug.h"
80 : #include "utils/rel.h"
81 : #include "utils/spccache.h"
82 :
83 : typedef struct InProgressIO
84 : {
85 : int16 buffer_index;
86 : ReadBuffersOperation op;
87 : } InProgressIO;
88 :
89 : /*
90 : * State for managing a stream of reads.
91 : */
92 : struct ReadStream
93 : {
94 : int16 max_ios;
95 : int16 io_combine_limit;
96 : int16 ios_in_progress;
97 : int16 queue_size;
98 : int16 max_pinned_buffers;
99 : int16 forwarded_buffers;
100 : int16 pinned_buffers;
101 : int16 distance;
102 : int16 initialized_buffers;
103 : int16 resume_distance;
104 : int read_buffers_flags;
105 : bool sync_mode; /* using io_method=sync */
106 : bool batch_mode; /* READ_STREAM_USE_BATCHING */
107 : bool advice_enabled;
108 : bool temporary;
109 :
110 : /*
111 : * One-block buffer to support 'ungetting' a block number, to resolve flow
112 : * control problems when I/Os are split.
113 : */
114 : BlockNumber buffered_blocknum;
115 :
116 : /*
117 : * The callback that will tell us which block numbers to read, and an
118 : * opaque pointer that will be pass to it for its own purposes.
119 : */
120 : ReadStreamBlockNumberCB callback;
121 : void *callback_private_data;
122 :
123 : /* Next expected block, for detecting sequential access. */
124 : BlockNumber seq_blocknum;
125 : BlockNumber seq_until_processed;
126 :
127 : /* The read operation we are currently preparing. */
128 : BlockNumber pending_read_blocknum;
129 : int16 pending_read_nblocks;
130 :
131 : /* Space for buffers and optional per-buffer private data. */
132 : size_t per_buffer_data_size;
133 : void *per_buffer_data;
134 :
135 : /* Read operations that have been started but not waited for yet. */
136 : InProgressIO *ios;
137 : int16 oldest_io_index;
138 : int16 next_io_index;
139 :
140 : bool fast_path;
141 :
142 : /* Circular queue of buffers. */
143 : int16 oldest_buffer_index; /* Next pinned buffer to return */
144 : int16 next_buffer_index; /* Index of next buffer to pin */
145 : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
146 : };
147 :
148 : /*
149 : * Return a pointer to the per-buffer data by index.
150 : */
151 : static inline void *
152 4614406 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
153 : {
154 9228812 : return (char *) stream->per_buffer_data +
155 4614406 : stream->per_buffer_data_size * buffer_index;
156 : }
157 :
158 : /*
159 : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
160 : * blocks [current_blocknum, last_exclusive).
161 : */
162 : BlockNumber
163 382574 : block_range_read_stream_cb(ReadStream *stream,
164 : void *callback_private_data,
165 : void *per_buffer_data)
166 : {
167 382574 : BlockRangeReadStreamPrivate *p = callback_private_data;
168 :
169 382574 : if (p->current_blocknum < p->last_exclusive)
170 313326 : return p->current_blocknum++;
171 :
172 69248 : return InvalidBlockNumber;
173 : }
174 :
175 : /*
176 : * Ask the callback which block it would like us to read next, with a one block
177 : * buffer in front to allow read_stream_unget_block() to work.
178 : */
179 : static inline BlockNumber
180 6160602 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
181 : {
182 : BlockNumber blocknum;
183 :
184 6160602 : blocknum = stream->buffered_blocknum;
185 6160602 : if (blocknum != InvalidBlockNumber)
186 4 : stream->buffered_blocknum = InvalidBlockNumber;
187 : else
188 : {
189 : /*
190 : * Tell Valgrind that the per-buffer data is undefined. That replaces
191 : * the "noaccess" state that was set when the consumer moved past this
192 : * entry last time around the queue, and should also catch callbacks
193 : * that fail to initialize data that the buffer consumer later
194 : * accesses. On the first go around, it is undefined already.
195 : */
196 : VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
197 : stream->per_buffer_data_size);
198 6160598 : blocknum = stream->callback(stream,
199 : stream->callback_private_data,
200 : per_buffer_data);
201 : }
202 :
203 6160602 : return blocknum;
204 : }
205 :
206 : /*
207 : * In order to deal with buffer shortages and I/O limits after short reads, we
208 : * sometimes need to defer handling of a block we've already consumed from the
209 : * registered callback until later.
210 : */
211 : static inline void
212 4 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
213 : {
214 : /* We shouldn't ever unget more than one block. */
215 : Assert(stream->buffered_blocknum == InvalidBlockNumber);
216 : Assert(blocknum != InvalidBlockNumber);
217 4 : stream->buffered_blocknum = blocknum;
218 4 : }
219 :
220 : /*
221 : * Start as much of the current pending read as we can. If we have to split it
222 : * because of the per-backend buffer limit, or the buffer manager decides to
223 : * split it, then the pending read is adjusted to hold the remaining portion.
224 : *
225 : * We can always start a read of at least size one if we have no progress yet.
226 : * Otherwise it's possible that we can't start a read at all because of a lack
227 : * of buffers, and then false is returned. Buffer shortages also reduce the
228 : * distance to a level that prevents look-ahead until buffers are released.
229 : */
230 : static bool
231 2229111 : read_stream_start_pending_read(ReadStream *stream)
232 : {
233 : bool need_wait;
234 : int requested_nblocks;
235 : int nblocks;
236 : int flags;
237 : int forwarded;
238 : int16 io_index;
239 : int16 overflow;
240 : int16 buffer_index;
241 : int buffer_limit;
242 :
243 : /* This should only be called with a pending read. */
244 : Assert(stream->pending_read_nblocks > 0);
245 : Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
246 :
247 : /* We had better not exceed the per-stream buffer limit with this read. */
248 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
249 : stream->max_pinned_buffers);
250 :
251 : #ifdef USE_ASSERT_CHECKING
252 : /* We had better not be overwriting an existing pinned buffer. */
253 : if (stream->pinned_buffers > 0)
254 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
255 : else
256 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
257 :
258 : /*
259 : * Pinned buffers forwarded by a preceding StartReadBuffers() call that
260 : * had to split the operation should match the leading blocks of this
261 : * following StartReadBuffers() call.
262 : */
263 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
264 : for (int i = 0; i < stream->forwarded_buffers; ++i)
265 : Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
266 : stream->pending_read_blocknum + i);
267 :
268 : /*
269 : * Check that we've cleared the queue/overflow entries corresponding to
270 : * the rest of the blocks covered by this read, unless it's the first go
271 : * around and we haven't even initialized them yet.
272 : */
273 : for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
274 : Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
275 : stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
276 : #endif
277 :
278 : /* Do we need to issue read-ahead advice? */
279 2229111 : flags = stream->read_buffers_flags;
280 2229111 : if (stream->advice_enabled)
281 : {
282 1620 : if (stream->pending_read_blocknum == stream->seq_blocknum)
283 : {
284 : /*
285 : * Sequential: Issue advice until the preadv() calls have caught
286 : * up with the first advice issued for this sequential region, and
287 : * then stay out of the way of the kernel's own read-ahead.
288 : */
289 22 : if (stream->seq_until_processed != InvalidBlockNumber)
290 1 : flags |= READ_BUFFERS_ISSUE_ADVICE;
291 : }
292 : else
293 : {
294 : /*
295 : * Random jump: Note the starting location of a new potential
296 : * sequential region and start issuing advice. Skip it this time
297 : * if the preadv() follows immediately, eg first block in stream.
298 : */
299 1598 : stream->seq_until_processed = stream->pending_read_blocknum;
300 1598 : if (stream->pinned_buffers > 0)
301 25 : flags |= READ_BUFFERS_ISSUE_ADVICE;
302 : }
303 : }
304 :
305 : /*
306 : * How many more buffers is this backend allowed?
307 : *
308 : * Forwarded buffers are already pinned and map to the leading blocks of
309 : * the pending read (the remaining portion of an earlier short read that
310 : * we're about to continue). They are not counted in pinned_buffers, but
311 : * they are counted as pins already held by this backend according to the
312 : * buffer manager, so they must be added to the limit it grants us.
313 : */
314 2229111 : if (stream->temporary)
315 12069 : buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
316 : else
317 2217042 : buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
318 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
319 :
320 2229111 : buffer_limit += stream->forwarded_buffers;
321 2229111 : buffer_limit = Min(buffer_limit, PG_INT16_MAX);
322 :
323 2229111 : if (buffer_limit == 0 && stream->pinned_buffers == 0)
324 668646 : buffer_limit = 1; /* guarantee progress */
325 :
326 : /* Does the per-backend limit affect this read? */
327 2229111 : nblocks = stream->pending_read_nblocks;
328 2229111 : if (buffer_limit < nblocks)
329 : {
330 : int16 new_distance;
331 :
332 : /* Shrink distance: no more look-ahead until buffers are released. */
333 2734 : new_distance = stream->pinned_buffers + buffer_limit;
334 2734 : if (stream->distance > new_distance)
335 1836 : stream->distance = new_distance;
336 :
337 : /* Unless we have nothing to give the consumer, stop here. */
338 2734 : if (stream->pinned_buffers > 0)
339 1429 : return false;
340 :
341 : /* A short read is required to make progress. */
342 1305 : nblocks = buffer_limit;
343 : }
344 :
345 : /*
346 : * We say how many blocks we want to read, but it may be smaller on return
347 : * if the buffer manager decides to shorten the read. Initialize buffers
348 : * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
349 : * and keep the original nblocks number so we can check for forwarded
350 : * buffers as output, below.
351 : */
352 2227682 : buffer_index = stream->next_buffer_index;
353 2227682 : io_index = stream->next_io_index;
354 3703987 : while (stream->initialized_buffers < buffer_index + nblocks)
355 1476305 : stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
356 2227682 : requested_nblocks = nblocks;
357 2227682 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
358 2227682 : &stream->buffers[buffer_index],
359 : stream->pending_read_blocknum,
360 : &nblocks,
361 : flags);
362 2227676 : stream->pinned_buffers += nblocks;
363 :
364 : /* Remember whether we need to wait before returning this buffer. */
365 2227676 : if (!need_wait)
366 : {
367 : /* Look-ahead distance decays, no I/O necessary. */
368 1614925 : if (stream->distance > 1)
369 17598 : stream->distance--;
370 : }
371 : else
372 : {
373 : /*
374 : * Remember to call WaitReadBuffers() before returning head buffer.
375 : * Look-ahead distance will be adjusted after waiting.
376 : */
377 612751 : stream->ios[io_index].buffer_index = buffer_index;
378 612751 : if (++stream->next_io_index == stream->max_ios)
379 25411 : stream->next_io_index = 0;
380 : Assert(stream->ios_in_progress < stream->max_ios);
381 612751 : stream->ios_in_progress++;
382 612751 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
383 : }
384 :
385 : /*
386 : * How many pins were acquired but forwarded to the next call? These need
387 : * to be passed to the next StartReadBuffers() call by leaving them
388 : * exactly where they are in the queue, or released if the stream ends
389 : * early. We need the number for accounting purposes, since they are not
390 : * counted in stream->pinned_buffers but we already hold them.
391 : */
392 2227676 : forwarded = 0;
393 2229337 : while (nblocks + forwarded < requested_nblocks &&
394 54647 : stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
395 1661 : forwarded++;
396 2227676 : stream->forwarded_buffers = forwarded;
397 :
398 : /*
399 : * We gave a contiguous range of buffer space to StartReadBuffers(), but
400 : * we want it to wrap around at queue_size. Copy overflowing buffers to
401 : * the front of the array where they'll be consumed, but also leave a copy
402 : * in the overflow zone which the I/O operation has a pointer to (it needs
403 : * a contiguous array). Both copies will be cleared when the buffers are
404 : * handed to the consumer.
405 : */
406 2227676 : overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
407 2227676 : if (overflow > 0)
408 : {
409 : Assert(overflow < stream->queue_size); /* can't overlap */
410 294 : memcpy(&stream->buffers[0],
411 294 : &stream->buffers[stream->queue_size],
412 : sizeof(stream->buffers[0]) * overflow);
413 : }
414 :
415 : /* Compute location of start of next read, without using % operator. */
416 2227676 : buffer_index += nblocks;
417 2227676 : if (buffer_index >= stream->queue_size)
418 312967 : buffer_index -= stream->queue_size;
419 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
420 2227676 : stream->next_buffer_index = buffer_index;
421 :
422 : /* Adjust the pending read to cover the remaining portion, if any. */
423 2227676 : stream->pending_read_blocknum += nblocks;
424 2227676 : stream->pending_read_nblocks -= nblocks;
425 :
426 2227676 : return true;
427 : }
428 :
429 : static void
430 4053924 : read_stream_look_ahead(ReadStream *stream)
431 : {
432 : /*
433 : * Allow amortizing the cost of submitting IO over multiple IOs. This
434 : * requires that we don't do any operations that could lead to a deadlock
435 : * with staged-but-unsubmitted IO. The callback needs to opt-in to being
436 : * careful.
437 : */
438 4053924 : if (stream->batch_mode)
439 3493181 : pgaio_enter_batchmode();
440 :
441 6459922 : while (stream->ios_in_progress < stream->max_ios &&
442 6459922 : stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
443 : {
444 : BlockNumber blocknum;
445 : int16 buffer_index;
446 : void *per_buffer_data;
447 :
448 3955877 : if (stream->pending_read_nblocks == stream->io_combine_limit)
449 : {
450 4323 : read_stream_start_pending_read(stream);
451 4323 : continue;
452 : }
453 :
454 : /*
455 : * See which block the callback wants next in the stream. We need to
456 : * compute the index of the Nth block of the pending read including
457 : * wrap-around, but we don't want to use the expensive % operator.
458 : */
459 3951554 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
460 3951554 : if (buffer_index >= stream->queue_size)
461 1722 : buffer_index -= stream->queue_size;
462 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
463 3951554 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
464 3951554 : blocknum = read_stream_get_block(stream, per_buffer_data);
465 3951554 : if (blocknum == InvalidBlockNumber)
466 : {
467 : /* End of stream. */
468 1549875 : stream->distance = 0;
469 1549875 : break;
470 : }
471 :
472 : /* Can we merge it with the pending read? */
473 2401679 : if (stream->pending_read_nblocks > 0 &&
474 228712 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
475 : {
476 228675 : stream->pending_read_nblocks++;
477 228675 : continue;
478 : }
479 :
480 : /* We have to start the pending read before we can build another. */
481 2173046 : while (stream->pending_read_nblocks > 0)
482 : {
483 46 : if (!read_stream_start_pending_read(stream) ||
484 42 : stream->ios_in_progress == stream->max_ios)
485 : {
486 : /* We've hit the buffer or I/O limit. Rewind and stop here. */
487 4 : read_stream_unget_block(stream, blocknum);
488 4 : if (stream->batch_mode)
489 4 : pgaio_exit_batchmode();
490 4 : return;
491 : }
492 : }
493 :
494 : /* This is the start of a new pending read. */
495 2173000 : stream->pending_read_blocknum = blocknum;
496 2173000 : stream->pending_read_nblocks = 1;
497 : }
498 :
499 : /*
500 : * We don't start the pending read just because we've hit the distance
501 : * limit, preferring to give it another chance to grow to full
502 : * io_combine_limit size once more buffers have been consumed. However,
503 : * if we've already reached io_combine_limit, or we've reached the
504 : * distance limit and there isn't anything pinned yet, or the callback has
505 : * signaled end-of-stream, we start the read immediately. Note that the
506 : * pending read can exceed the distance goal, if the latter was reduced
507 : * after hitting the per-backend buffer limit.
508 : */
509 4053920 : if (stream->pending_read_nblocks > 0 &&
510 2267468 : (stream->pending_read_nblocks == stream->io_combine_limit ||
511 2259085 : (stream->pending_read_nblocks >= stream->distance &&
512 2216359 : stream->pinned_buffers == 0) ||
513 49015 : stream->distance == 0) &&
514 2224742 : stream->ios_in_progress < stream->max_ios)
515 2224742 : read_stream_start_pending_read(stream);
516 :
517 : /*
518 : * There should always be something pinned when we leave this function,
519 : * whether started by this call or not, unless we've hit the end of the
520 : * stream. In the worst case we can always make progress one buffer at a
521 : * time.
522 : */
523 : Assert(stream->pinned_buffers > 0 || stream->distance == 0);
524 :
525 4053914 : if (stream->batch_mode)
526 3493171 : pgaio_exit_batchmode();
527 : }
528 :
529 : /*
530 : * Create a new read stream object that can be used to perform the equivalent
531 : * of a series of ReadBuffer() calls for one fork of one relation.
532 : * Internally, it generates larger vectored reads where possible by looking
533 : * ahead. The callback should return block numbers or InvalidBlockNumber to
534 : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
535 : * write extra data for each block into the space provided to it. It will
536 : * also receive callback_private_data for its own purposes.
537 : */
538 : static ReadStream *
539 893088 : read_stream_begin_impl(int flags,
540 : BufferAccessStrategy strategy,
541 : Relation rel,
542 : SMgrRelation smgr,
543 : char persistence,
544 : ForkNumber forknum,
545 : ReadStreamBlockNumberCB callback,
546 : void *callback_private_data,
547 : size_t per_buffer_data_size)
548 : {
549 : ReadStream *stream;
550 : size_t size;
551 : int16 queue_size;
552 : int16 queue_overflow;
553 : int max_ios;
554 : int strategy_pin_limit;
555 : uint32 max_pinned_buffers;
556 : uint32 max_possible_buffer_limit;
557 : Oid tablespace_id;
558 :
559 : /*
560 : * Decide how many I/Os we will allow to run at the same time. That
561 : * currently means advice to the kernel to tell it that we will soon read.
562 : * This number also affects how far we look ahead for opportunities to
563 : * start more I/Os.
564 : */
565 893088 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
566 893088 : if (!OidIsValid(MyDatabaseId) ||
567 1281343 : (rel && IsCatalogRelation(rel)) ||
568 462561 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
569 : {
570 : /*
571 : * Avoid circularity while trying to look up tablespace settings or
572 : * before spccache.c is ready.
573 : */
574 490204 : max_ios = effective_io_concurrency;
575 : }
576 402884 : else if (flags & READ_STREAM_MAINTENANCE)
577 15977 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
578 : else
579 386907 : max_ios = get_tablespace_io_concurrency(tablespace_id);
580 :
581 : /* Cap to INT16_MAX to avoid overflowing below */
582 893088 : max_ios = Min(max_ios, PG_INT16_MAX);
583 :
584 : /*
585 : * If starting a multi-block I/O near the end of the queue, we might
586 : * temporarily need extra space for overflowing buffers before they are
587 : * moved to regular circular position. This is the maximum extra space we
588 : * could need.
589 : */
590 893088 : queue_overflow = io_combine_limit - 1;
591 :
592 : /*
593 : * Choose the maximum number of buffers we're prepared to pin. We try to
594 : * pin fewer if we can, though. We add one so that we can make progress
595 : * even if max_ios is set to 0 (see also further down). For max_ios > 0,
596 : * this also allows an extra full I/O's worth of buffers: after an I/O
597 : * finishes we don't want to have to wait for its buffers to be consumed
598 : * before starting a new one.
599 : *
600 : * Be careful not to allow int16 to overflow. That is possible with the
601 : * current GUC range limits, so this is an artificial limit of ~32k
602 : * buffers and we'd need to adjust the types to exceed that. We also have
603 : * to allow for the spare entry and the overflow space.
604 : */
605 893088 : max_pinned_buffers = (max_ios + 1) * io_combine_limit;
606 893088 : max_pinned_buffers = Min(max_pinned_buffers,
607 : PG_INT16_MAX - queue_overflow - 1);
608 :
609 : /* Give the strategy a chance to limit the number of buffers we pin. */
610 893088 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
611 893088 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
612 :
613 : /*
614 : * Also limit our queue to the maximum number of pins we could ever be
615 : * allowed to acquire according to the buffer manager. We may not really
616 : * be able to use them all due to other pins held by this backend, but
617 : * we'll check that later in read_stream_start_pending_read().
618 : */
619 893088 : if (SmgrIsTemp(smgr))
620 6965 : max_possible_buffer_limit = GetLocalPinLimit();
621 : else
622 886123 : max_possible_buffer_limit = GetPinLimit();
623 893088 : max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
624 :
625 : /*
626 : * The limit might be zero on a system configured with too few buffers for
627 : * the number of connections. We need at least one to make progress.
628 : */
629 893088 : max_pinned_buffers = Max(1, max_pinned_buffers);
630 :
631 : /*
632 : * We need one extra entry for buffers and per-buffer data, because users
633 : * of per-buffer data have access to the object until the next call to
634 : * read_stream_next_buffer(), so we need a gap between the head and tail
635 : * of the queue so that we don't clobber it.
636 : */
637 893088 : queue_size = max_pinned_buffers + 1;
638 :
639 : /*
640 : * Allocate the object, the buffers, the ios and per_buffer_data space in
641 : * one big chunk. Though we have queue_size buffers, we want to be able
642 : * to assume that all the buffers for a single read are contiguous (i.e.
643 : * don't wrap around halfway through), so we allow temporary overflows of
644 : * up to the maximum possible overflow size.
645 : */
646 893088 : size = offsetof(ReadStream, buffers);
647 893088 : size += sizeof(Buffer) * (queue_size + queue_overflow);
648 893088 : size += sizeof(InProgressIO) * Max(1, max_ios);
649 893088 : size += per_buffer_data_size * queue_size;
650 893088 : size += MAXIMUM_ALIGNOF * 2;
651 893088 : stream = (ReadStream *) palloc(size);
652 893088 : memset(stream, 0, offsetof(ReadStream, buffers));
653 893088 : stream->ios = (InProgressIO *)
654 893088 : MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
655 893088 : if (per_buffer_data_size > 0)
656 124696 : stream->per_buffer_data = (void *)
657 124696 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
658 :
659 893088 : stream->sync_mode = io_method == IOMETHOD_SYNC;
660 893088 : stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
661 :
662 : #ifdef USE_PREFETCH
663 :
664 : /*
665 : * Read-ahead advice simulating asynchronous I/O with synchronous calls.
666 : * Issue advice only if AIO is not used, direct I/O isn't enabled, the
667 : * caller hasn't promised sequential access (overriding our detection
668 : * heuristics), and max_ios hasn't been set to zero.
669 : */
670 893088 : if (stream->sync_mode &&
671 2886 : (io_direct_flags & IO_DIRECT_DATA) == 0 &&
672 2886 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
673 : max_ios > 0)
674 692 : stream->advice_enabled = true;
675 : #endif
676 :
677 : /*
678 : * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
679 : * we still need to allocate space to combine and run one I/O. Bump it up
680 : * to one, and remember to ask for synchronous I/O only.
681 : */
682 893088 : if (max_ios == 0)
683 : {
684 7 : max_ios = 1;
685 7 : stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
686 : }
687 :
688 : /*
689 : * Capture stable values for these two GUC-derived numbers for the
690 : * lifetime of this stream, so we don't have to worry about the GUCs
691 : * changing underneath us beyond this point.
692 : */
693 893088 : stream->max_ios = max_ios;
694 893088 : stream->io_combine_limit = io_combine_limit;
695 :
696 893088 : stream->per_buffer_data_size = per_buffer_data_size;
697 893088 : stream->max_pinned_buffers = max_pinned_buffers;
698 893088 : stream->queue_size = queue_size;
699 893088 : stream->callback = callback;
700 893088 : stream->callback_private_data = callback_private_data;
701 893088 : stream->buffered_blocknum = InvalidBlockNumber;
702 893088 : stream->seq_blocknum = InvalidBlockNumber;
703 893088 : stream->seq_until_processed = InvalidBlockNumber;
704 893088 : stream->temporary = SmgrIsTemp(smgr);
705 :
706 : /*
707 : * Skip the initial ramp-up phase if the caller says we're going to be
708 : * reading the whole relation. This way we start out assuming we'll be
709 : * doing full io_combine_limit sized reads.
710 : */
711 893088 : if (flags & READ_STREAM_FULL)
712 70075 : stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
713 : else
714 823013 : stream->distance = 1;
715 893088 : stream->resume_distance = stream->distance;
716 :
717 : /*
718 : * Since we always access the same relation, we can initialize parts of
719 : * the ReadBuffersOperation objects and leave them that way, to avoid
720 : * wasting CPU cycles writing to them for each read.
721 : */
722 15203845 : for (int i = 0; i < max_ios; ++i)
723 : {
724 14310757 : stream->ios[i].op.rel = rel;
725 14310757 : stream->ios[i].op.smgr = smgr;
726 14310757 : stream->ios[i].op.persistence = persistence;
727 14310757 : stream->ios[i].op.forknum = forknum;
728 14310757 : stream->ios[i].op.strategy = strategy;
729 : }
730 :
731 893088 : return stream;
732 : }
733 :
734 : /*
735 : * Create a new read stream for reading a relation.
736 : * See read_stream_begin_impl() for the detailed explanation.
737 : */
738 : ReadStream *
739 829099 : read_stream_begin_relation(int flags,
740 : BufferAccessStrategy strategy,
741 : Relation rel,
742 : ForkNumber forknum,
743 : ReadStreamBlockNumberCB callback,
744 : void *callback_private_data,
745 : size_t per_buffer_data_size)
746 : {
747 829099 : return read_stream_begin_impl(flags,
748 : strategy,
749 : rel,
750 : RelationGetSmgr(rel),
751 829099 : rel->rd_rel->relpersistence,
752 : forknum,
753 : callback,
754 : callback_private_data,
755 : per_buffer_data_size);
756 : }
757 :
758 : /*
759 : * Create a new read stream for reading a SMgr relation.
760 : * See read_stream_begin_impl() for the detailed explanation.
761 : */
762 : ReadStream *
763 63989 : read_stream_begin_smgr_relation(int flags,
764 : BufferAccessStrategy strategy,
765 : SMgrRelation smgr,
766 : char smgr_persistence,
767 : ForkNumber forknum,
768 : ReadStreamBlockNumberCB callback,
769 : void *callback_private_data,
770 : size_t per_buffer_data_size)
771 : {
772 63989 : return read_stream_begin_impl(flags,
773 : strategy,
774 : NULL,
775 : smgr,
776 : smgr_persistence,
777 : forknum,
778 : callback,
779 : callback_private_data,
780 : per_buffer_data_size);
781 : }
782 :
783 : /*
784 : * Pull one pinned buffer out of a stream. Each call returns successive
785 : * blocks in the order specified by the callback. If per_buffer_data_size was
786 : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
787 : * per-buffer data that the callback had a chance to populate, which remains
788 : * valid until the next call to read_stream_next_buffer(). When the stream
789 : * runs out of data, InvalidBuffer is returned. The caller may decide to end
790 : * the stream early at any time by calling read_stream_end().
791 : */
792 : Buffer
793 7470413 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
794 : {
795 : Buffer buffer;
796 : int16 oldest_buffer_index;
797 :
798 : #ifndef READ_STREAM_DISABLE_FAST_PATH
799 :
800 : /*
801 : * A fast path for all-cached scans. This is the same as the usual
802 : * algorithm, but it is specialized for no I/O and no per-buffer data, so
803 : * we can skip the queue management code, stay in the same buffer slot and
804 : * use singular StartReadBuffer().
805 : */
806 7470413 : if (likely(stream->fast_path))
807 : {
808 : BlockNumber next_blocknum;
809 :
810 : /* Fast path assumptions. */
811 : Assert(stream->ios_in_progress == 0);
812 : Assert(stream->forwarded_buffers == 0);
813 : Assert(stream->pinned_buffers == 1);
814 : Assert(stream->distance == 1);
815 : Assert(stream->pending_read_nblocks == 0);
816 : Assert(stream->per_buffer_data_size == 0);
817 : Assert(stream->initialized_buffers > stream->oldest_buffer_index);
818 :
819 : /* We're going to return the buffer we pinned last time. */
820 2209048 : oldest_buffer_index = stream->oldest_buffer_index;
821 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
822 : stream->next_buffer_index);
823 2209048 : buffer = stream->buffers[oldest_buffer_index];
824 : Assert(buffer != InvalidBuffer);
825 :
826 : /* Choose the next block to pin. */
827 2209048 : next_blocknum = read_stream_get_block(stream, NULL);
828 :
829 2209048 : if (likely(next_blocknum != InvalidBlockNumber))
830 : {
831 2117474 : int flags = stream->read_buffers_flags;
832 :
833 2117474 : if (stream->advice_enabled)
834 524 : flags |= READ_BUFFERS_ISSUE_ADVICE;
835 :
836 : /*
837 : * Pin a buffer for the next call. Same buffer entry, and
838 : * arbitrary I/O entry (they're all free). We don't have to
839 : * adjust pinned_buffers because we're transferring one to caller
840 : * but pinning one more.
841 : *
842 : * In the fast path we don't need to check the pin limit. We're
843 : * always allowed at least one pin so that progress can be made,
844 : * and that's all we need here. Although two pins are momentarily
845 : * held at the same time, the model used here is that the stream
846 : * holds only one, and the other now belongs to the caller.
847 : */
848 2117474 : if (likely(!StartReadBuffer(&stream->ios[0].op,
849 : &stream->buffers[oldest_buffer_index],
850 : next_blocknum,
851 : flags)))
852 : {
853 : /* Fast return. */
854 2102734 : return buffer;
855 : }
856 :
857 : /* Next call must wait for I/O for the newly pinned buffer. */
858 14740 : stream->oldest_io_index = 0;
859 14740 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
860 14740 : stream->ios_in_progress = 1;
861 14740 : stream->ios[0].buffer_index = oldest_buffer_index;
862 14740 : stream->seq_blocknum = next_blocknum + 1;
863 : }
864 : else
865 : {
866 : /* No more blocks, end of stream. */
867 91574 : stream->distance = 0;
868 91574 : stream->oldest_buffer_index = stream->next_buffer_index;
869 91574 : stream->pinned_buffers = 0;
870 91574 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
871 : }
872 :
873 106314 : stream->fast_path = false;
874 106314 : return buffer;
875 : }
876 : #endif
877 :
878 5261365 : if (unlikely(stream->pinned_buffers == 0))
879 : {
880 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
881 :
882 : /* End of stream reached? */
883 3885820 : if (stream->distance == 0)
884 2141077 : return InvalidBuffer;
885 :
886 : /*
887 : * The usual order of operations is that we look ahead at the bottom
888 : * of this function after potentially finishing an I/O and making
889 : * space for more, but if we're just starting up we'll need to crank
890 : * the handle to get started.
891 : */
892 1744743 : read_stream_look_ahead(stream);
893 :
894 : /* End of stream reached? */
895 1744743 : if (stream->pinned_buffers == 0)
896 : {
897 : Assert(stream->distance == 0);
898 811085 : return InvalidBuffer;
899 : }
900 : }
901 :
902 : /* Grab the oldest pinned buffer and associated per-buffer data. */
903 : Assert(stream->pinned_buffers > 0);
904 2309203 : oldest_buffer_index = stream->oldest_buffer_index;
905 : Assert(oldest_buffer_index >= 0 &&
906 : oldest_buffer_index < stream->queue_size);
907 2309203 : buffer = stream->buffers[oldest_buffer_index];
908 2309203 : if (per_buffer_data)
909 662852 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
910 :
911 : Assert(BufferIsValid(buffer));
912 :
913 : /* Do we have to wait for an associated I/O first? */
914 2309203 : if (stream->ios_in_progress > 0 &&
915 717758 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
916 : {
917 627194 : int16 io_index = stream->oldest_io_index;
918 : int32 distance; /* wider temporary value, clamped below */
919 :
920 : /* Sanity check that we still agree on the buffers. */
921 : Assert(stream->ios[io_index].op.buffers ==
922 : &stream->buffers[oldest_buffer_index]);
923 :
924 627194 : WaitReadBuffers(&stream->ios[io_index].op);
925 :
926 : Assert(stream->ios_in_progress > 0);
927 627172 : stream->ios_in_progress--;
928 627172 : if (++stream->oldest_io_index == stream->max_ios)
929 25411 : stream->oldest_io_index = 0;
930 :
931 : /* Look-ahead distance ramps up rapidly after we do I/O. */
932 627172 : distance = stream->distance * 2;
933 627172 : distance = Min(distance, stream->max_pinned_buffers);
934 627172 : stream->distance = distance;
935 :
936 : /*
937 : * If we've reached the first block of a sequential region we're
938 : * issuing advice for, cancel that until the next jump. The kernel
939 : * will see the sequential preadv() pattern starting here.
940 : */
941 627172 : if (stream->advice_enabled &&
942 273 : stream->ios[io_index].op.blocknum == stream->seq_until_processed)
943 252 : stream->seq_until_processed = InvalidBlockNumber;
944 : }
945 :
946 : /*
947 : * We must zap this queue entry, or else it would appear as a forwarded
948 : * buffer. If it's potentially in the overflow zone (ie from a
949 : * multi-block I/O that wrapped around the queue), also zap the copy.
950 : */
951 2309181 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
952 2309181 : if (oldest_buffer_index < stream->io_combine_limit - 1)
953 1791329 : stream->buffers[stream->queue_size + oldest_buffer_index] =
954 : InvalidBuffer;
955 :
956 : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
957 :
958 : /*
959 : * The caller will get access to the per-buffer data, until the next call.
960 : * We wipe the one before, which is never occupied because queue_size
961 : * allowed one extra element. This will hopefully trip up client code
962 : * that is holding a dangling pointer to it.
963 : */
964 : if (stream->per_buffer_data)
965 : {
966 : void *per_buffer_data;
967 :
968 : per_buffer_data = get_per_buffer_data(stream,
969 : oldest_buffer_index == 0 ?
970 : stream->queue_size - 1 :
971 : oldest_buffer_index - 1);
972 :
973 : #if defined(CLOBBER_FREED_MEMORY)
974 : /* This also tells Valgrind the memory is "noaccess". */
975 : wipe_mem(per_buffer_data, stream->per_buffer_data_size);
976 : #elif defined(USE_VALGRIND)
977 : /* Tell it ourselves. */
978 : VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
979 : stream->per_buffer_data_size);
980 : #endif
981 : }
982 : #endif
983 :
984 : /* Pin transferred to caller. */
985 : Assert(stream->pinned_buffers > 0);
986 2309181 : stream->pinned_buffers--;
987 :
988 : /* Advance oldest buffer, with wrap-around. */
989 2309181 : stream->oldest_buffer_index++;
990 2309181 : if (stream->oldest_buffer_index == stream->queue_size)
991 305057 : stream->oldest_buffer_index = 0;
992 :
993 : /* Prepare for the next call. */
994 2309181 : read_stream_look_ahead(stream);
995 :
996 : #ifndef READ_STREAM_DISABLE_FAST_PATH
997 : /* See if we can take the fast path for all-cached scans next time. */
998 2309175 : if (stream->ios_in_progress == 0 &&
999 1682159 : stream->forwarded_buffers == 0 &&
1000 1681791 : stream->pinned_buffers == 1 &&
1001 780344 : stream->distance == 1 &&
1002 708012 : stream->pending_read_nblocks == 0 &&
1003 706585 : stream->per_buffer_data_size == 0)
1004 : {
1005 : /*
1006 : * The fast path spins on one buffer entry repeatedly instead of
1007 : * rotating through the whole queue and clearing the entries behind
1008 : * it. If the buffer it starts with happened to be forwarded between
1009 : * StartReadBuffers() calls and also wrapped around the circular queue
1010 : * partway through, then a copy also exists in the overflow zone, and
1011 : * it won't clear it out as the regular path would. Do that now, so
1012 : * it doesn't need code for that.
1013 : */
1014 203097 : if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1015 201734 : stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1016 : InvalidBuffer;
1017 :
1018 203097 : stream->fast_path = true;
1019 : }
1020 : #endif
1021 :
1022 2309175 : return buffer;
1023 : }
1024 :
1025 : /*
1026 : * Transitional support for code that would like to perform or skip reads
1027 : * itself, without using the stream. Returns, and consumes, the next block
1028 : * number that would be read by the stream's look-ahead algorithm, or
1029 : * InvalidBlockNumber if the end of the stream is reached. Also reports the
1030 : * strategy that would be used to read it.
1031 : */
1032 : BlockNumber
1033 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
1034 : {
1035 0 : *strategy = stream->ios[0].op.strategy;
1036 0 : return read_stream_get_block(stream, NULL);
1037 : }
1038 :
1039 : /*
1040 : * Temporarily stop consuming block numbers from the block number callback.
1041 : * If called inside the block number callback, its return value should be
1042 : * returned by the callback.
1043 : */
1044 : BlockNumber
1045 0 : read_stream_pause(ReadStream *stream)
1046 : {
1047 0 : stream->resume_distance = stream->distance;
1048 0 : stream->distance = 0;
1049 0 : return InvalidBlockNumber;
1050 : }
1051 :
1052 : /*
1053 : * Resume looking ahead after the block number callback reported
1054 : * end-of-stream. This is useful for streams of self-referential blocks, after
1055 : * a buffer needed to be consumed and examined to find more block numbers.
1056 : */
1057 : void
1058 0 : read_stream_resume(ReadStream *stream)
1059 : {
1060 0 : stream->distance = stream->resume_distance;
1061 0 : }
1062 :
1063 : /*
1064 : * Reset a read stream by releasing any queued up buffers, allowing the stream
1065 : * to be used again for different blocks. This can be used to clear an
1066 : * end-of-stream condition and start again, or to throw away blocks that were
1067 : * speculatively read and read some different blocks instead.
1068 : */
1069 : void
1070 1745280 : read_stream_reset(ReadStream *stream)
1071 : {
1072 : int16 index;
1073 : Buffer buffer;
1074 :
1075 : /* Stop looking ahead. */
1076 1745280 : stream->distance = 0;
1077 :
1078 : /* Forget buffered block number and fast path state. */
1079 1745280 : stream->buffered_blocknum = InvalidBlockNumber;
1080 1745280 : stream->fast_path = false;
1081 :
1082 : /* Unpin anything that wasn't consumed. */
1083 1871624 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1084 126344 : ReleaseBuffer(buffer);
1085 :
1086 : /* Unpin any unused forwarded buffers. */
1087 1745280 : index = stream->next_buffer_index;
1088 1745280 : while (index < stream->initialized_buffers &&
1089 213511 : (buffer = stream->buffers[index]) != InvalidBuffer)
1090 : {
1091 : Assert(stream->forwarded_buffers > 0);
1092 0 : stream->forwarded_buffers--;
1093 0 : ReleaseBuffer(buffer);
1094 :
1095 0 : stream->buffers[index] = InvalidBuffer;
1096 0 : if (index < stream->io_combine_limit - 1)
1097 0 : stream->buffers[stream->queue_size + index] = InvalidBuffer;
1098 :
1099 0 : if (++index == stream->queue_size)
1100 0 : index = 0;
1101 : }
1102 :
1103 : Assert(stream->forwarded_buffers == 0);
1104 : Assert(stream->pinned_buffers == 0);
1105 : Assert(stream->ios_in_progress == 0);
1106 :
1107 : /* Start off assuming data is cached. */
1108 1745280 : stream->distance = 1;
1109 1745280 : stream->resume_distance = stream->distance;
1110 1745280 : }
1111 :
1112 : /*
1113 : * Release and free a read stream.
1114 : */
1115 : void
1116 890684 : read_stream_end(ReadStream *stream)
1117 : {
1118 890684 : read_stream_reset(stream);
1119 890684 : pfree(stream);
1120 890684 : }
|