Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * read_stream.c
4 : * Mechanism for accessing buffered relation data with look-ahead
5 : *
6 : * Code that needs to access relation data typically pins blocks one at a
7 : * time, often in a predictable order that might be sequential or data-driven.
8 : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : * because blocks that are not yet in the buffer pool require I/O operations
10 : * that are small and might stall waiting for storage. This mechanism looks
11 : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : * distance.
14 : *
15 : * A user-provided callback generates a stream of block numbers that is used
16 : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : * pending read. When that isn't possible, the existing pending read is sent
18 : * to StartReadBuffers() so that a new one can begin to form.
19 : *
20 : * The algorithm for controlling the look-ahead distance is based on recent
21 : * cache hit and miss history. When no I/O is necessary, there is no benefit
22 : * in looking ahead more than one block. This is the default initial
23 : * assumption, but when blocks needing I/O are streamed, the distance is
24 : * increased rapidly to try to benefit from I/O combining and concurrency. It
25 : * is reduced gradually when cached blocks are streamed.
26 : *
27 : * The main data structure is a circular queue of buffers of size
28 : * max_pinned_buffers plus some extra space for technical reasons, ready to be
29 : * returned by read_stream_next_buffer(). Each buffer also has an optional
30 : * variable sized object that is passed from the callback to the consumer of
31 : * buffers.
32 : *
33 : * Parallel to the queue of buffers, there is a circular queue of in-progress
34 : * I/Os that have been started with StartReadBuffers(), and for which
35 : * WaitReadBuffers() must be called before returning the buffer.
36 : *
37 : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
38 : * successive calls, then these data structures might appear as follows:
39 : *
40 : * buffers buf/data ios
41 : *
42 : * +----+ +-----+ +--------+
43 : * | | | | +----+ 42..44 | <- oldest_io_index
44 : * +----+ +-----+ | +--------+
45 : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
46 : * +----+ +-----+ | | +--------+
47 : * | 42 | | ? |<-+ | | | <- next_io_index
48 : * +----+ +-----+ | +--------+
49 : * | 43 | | ? | | | |
50 : * +----+ +-----+ | +--------+
51 : * | 44 | | ? | | | |
52 : * +----+ +-----+ | +--------+
53 : * | 60 | | ? |<---+
54 : * +----+ +-----+
55 : * next_buffer_index -> | | | |
56 : * +----+ +-----+
57 : *
58 : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
59 : * the client is block 10. Block 10 was a hit and has no associated I/O, but
60 : * the range 42..44 requires an I/O wait before its buffers are returned, as
61 : * does block 60.
62 : *
63 : *
64 : * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
65 : * Portions Copyright (c) 1994, Regents of the University of California
66 : *
67 : * IDENTIFICATION
68 : * src/backend/storage/aio/read_stream.c
69 : *
70 : *-------------------------------------------------------------------------
71 : */
72 : #include "postgres.h"
73 :
74 : #include "miscadmin.h"
75 : #include "storage/aio.h"
76 : #include "storage/fd.h"
77 : #include "storage/smgr.h"
78 : #include "storage/read_stream.h"
79 : #include "utils/memdebug.h"
80 : #include "utils/rel.h"
81 : #include "utils/spccache.h"
82 :
83 : typedef struct InProgressIO
84 : {
85 : int16 buffer_index;
86 : ReadBuffersOperation op;
87 : } InProgressIO;
88 :
89 : /*
90 : * State for managing a stream of reads.
91 : */
92 : struct ReadStream
93 : {
94 : int16 max_ios;
95 : int16 io_combine_limit;
96 : int16 ios_in_progress;
97 : int16 queue_size;
98 : int16 max_pinned_buffers;
99 : int16 forwarded_buffers;
100 : int16 pinned_buffers;
101 : int16 distance;
102 : int16 initialized_buffers;
103 : int read_buffers_flags;
104 : bool sync_mode; /* using io_method=sync */
105 : bool batch_mode; /* READ_STREAM_USE_BATCHING */
106 : bool advice_enabled;
107 : bool temporary;
108 :
109 : /*
110 : * One-block buffer to support 'ungetting' a block number, to resolve flow
111 : * control problems when I/Os are split.
112 : */
113 : BlockNumber buffered_blocknum;
114 :
115 : /*
116 : * The callback that will tell us which block numbers to read, and an
117 : * opaque pointer that will be pass to it for its own purposes.
118 : */
119 : ReadStreamBlockNumberCB callback;
120 : void *callback_private_data;
121 :
122 : /* Next expected block, for detecting sequential access. */
123 : BlockNumber seq_blocknum;
124 : BlockNumber seq_until_processed;
125 :
126 : /* The read operation we are currently preparing. */
127 : BlockNumber pending_read_blocknum;
128 : int16 pending_read_nblocks;
129 :
130 : /* Space for buffers and optional per-buffer private data. */
131 : size_t per_buffer_data_size;
132 : void *per_buffer_data;
133 :
134 : /* Read operations that have been started but not waited for yet. */
135 : InProgressIO *ios;
136 : int16 oldest_io_index;
137 : int16 next_io_index;
138 :
139 : bool fast_path;
140 :
141 : /* Circular queue of buffers. */
142 : int16 oldest_buffer_index; /* Next pinned buffer to return */
143 : int16 next_buffer_index; /* Index of next buffer to pin */
144 : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
145 : };
146 :
147 : /*
148 : * Return a pointer to the per-buffer data by index.
149 : */
150 : static inline void *
151 7379668 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
152 : {
153 14759336 : return (char *) stream->per_buffer_data +
154 7379668 : stream->per_buffer_data_size * buffer_index;
155 : }
156 :
157 : /*
158 : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
159 : * blocks [current_blocknum, last_exclusive).
160 : */
161 : BlockNumber
162 658238 : block_range_read_stream_cb(ReadStream *stream,
163 : void *callback_private_data,
164 : void *per_buffer_data)
165 : {
166 658238 : BlockRangeReadStreamPrivate *p = callback_private_data;
167 :
168 658238 : if (p->current_blocknum < p->last_exclusive)
169 529072 : return p->current_blocknum++;
170 :
171 129166 : return InvalidBlockNumber;
172 : }
173 :
174 : /*
175 : * Ask the callback which block it would like us to read next, with a one block
176 : * buffer in front to allow read_stream_unget_block() to work.
177 : */
178 : static inline BlockNumber
179 10119786 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
180 : {
181 : BlockNumber blocknum;
182 :
183 10119786 : blocknum = stream->buffered_blocknum;
184 10119786 : if (blocknum != InvalidBlockNumber)
185 8 : stream->buffered_blocknum = InvalidBlockNumber;
186 : else
187 : {
188 : /*
189 : * Tell Valgrind that the per-buffer data is undefined. That replaces
190 : * the "noaccess" state that was set when the consumer moved past this
191 : * entry last time around the queue, and should also catch callbacks
192 : * that fail to initialize data that the buffer consumer later
193 : * accesses. On the first go around, it is undefined already.
194 : */
195 : VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
196 : stream->per_buffer_data_size);
197 10119778 : blocknum = stream->callback(stream,
198 : stream->callback_private_data,
199 : per_buffer_data);
200 : }
201 :
202 10119786 : return blocknum;
203 : }
204 :
205 : /*
206 : * In order to deal with buffer shortages and I/O limits after short reads, we
207 : * sometimes need to defer handling of a block we've already consumed from the
208 : * registered callback until later.
209 : */
210 : static inline void
211 8 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
212 : {
213 : /* We shouldn't ever unget more than one block. */
214 : Assert(stream->buffered_blocknum == InvalidBlockNumber);
215 : Assert(blocknum != InvalidBlockNumber);
216 8 : stream->buffered_blocknum = blocknum;
217 8 : }
218 :
219 : /*
220 : * Start as much of the current pending read as we can. If we have to split it
221 : * because of the per-backend buffer limit, or the buffer manager decides to
222 : * split it, then the pending read is adjusted to hold the remaining portion.
223 : *
224 : * We can always start a read of at least size one if we have no progress yet.
225 : * Otherwise it's possible that we can't start a read at all because of a lack
226 : * of buffers, and then false is returned. Buffer shortages also reduce the
227 : * distance to a level that prevents look-ahead until buffers are released.
228 : */
229 : static bool
230 3636630 : read_stream_start_pending_read(ReadStream *stream)
231 : {
232 : bool need_wait;
233 : int requested_nblocks;
234 : int nblocks;
235 : int flags;
236 : int forwarded;
237 : int16 io_index;
238 : int16 overflow;
239 : int16 buffer_index;
240 : int buffer_limit;
241 :
242 : /* This should only be called with a pending read. */
243 : Assert(stream->pending_read_nblocks > 0);
244 : Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
245 :
246 : /* We had better not exceed the per-stream buffer limit with this read. */
247 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
248 : stream->max_pinned_buffers);
249 :
250 : /* We had better not be overwriting an existing pinned buffer. */
251 3636630 : if (stream->pinned_buffers > 0)
252 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
253 : else
254 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
255 :
256 : /* Do we need to issue read-ahead advice? */
257 3636630 : flags = stream->read_buffers_flags;
258 3636630 : if (stream->advice_enabled)
259 : {
260 3284 : if (stream->pending_read_blocknum == stream->seq_blocknum)
261 : {
262 : /*
263 : * Sequential: Issue advice until the preadv() calls have caught
264 : * up with the first advice issued for this sequential region, and
265 : * then stay of the way of the kernel's own read-ahead.
266 : */
267 44 : if (stream->seq_until_processed != InvalidBlockNumber)
268 2 : flags |= READ_BUFFERS_ISSUE_ADVICE;
269 : }
270 : else
271 : {
272 : /*
273 : * Random jump: Note the starting location of a new potential
274 : * sequential region and start issuing advice. Skip it this time
275 : * if the preadv() follows immediately, eg first block in stream.
276 : */
277 3240 : stream->seq_until_processed = stream->pending_read_blocknum;
278 3240 : if (stream->pinned_buffers > 0)
279 48 : flags |= READ_BUFFERS_ISSUE_ADVICE;
280 : }
281 : }
282 :
283 : /*
284 : * How many more buffers is this backend allowed?
285 : *
286 : * Forwarded buffers are already pinned and map to the leading blocks of
287 : * the pending read (the remaining portion of an earlier short read that
288 : * we're about to continue). They are not counted in pinned_buffers, but
289 : * they are counted as pins already held by this backend according to the
290 : * buffer manager, so they must be added to the limit it grants us.
291 : */
292 3636630 : if (stream->temporary)
293 23668 : buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
294 : else
295 3612962 : buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
296 : Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
297 :
298 3636630 : buffer_limit += stream->forwarded_buffers;
299 3636630 : buffer_limit = Min(buffer_limit, PG_INT16_MAX);
300 :
301 3636630 : if (buffer_limit == 0 && stream->pinned_buffers == 0)
302 1231940 : buffer_limit = 1; /* guarantee progress */
303 :
304 : /* Does the per-backend limit affect this read? */
305 3636630 : nblocks = stream->pending_read_nblocks;
306 3636630 : if (buffer_limit < nblocks)
307 : {
308 : int16 new_distance;
309 :
310 : /* Shrink distance: no more look-ahead until buffers are released. */
311 5010 : new_distance = stream->pinned_buffers + buffer_limit;
312 5010 : if (stream->distance > new_distance)
313 3488 : stream->distance = new_distance;
314 :
315 : /* Unless we have nothing to give the consumer, stop here. */
316 5010 : if (stream->pinned_buffers > 0)
317 2482 : return false;
318 :
319 : /* A short read is required to make progress. */
320 2528 : nblocks = buffer_limit;
321 : }
322 :
323 : /*
324 : * We say how many blocks we want to read, but it may be smaller on return
325 : * if the buffer manager decides to shorten the read. Initialize buffers
326 : * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
327 : * and keep the original nblocks number so we can check for forwarded
328 : * buffers as output, below.
329 : */
330 3634148 : buffer_index = stream->next_buffer_index;
331 3634148 : io_index = stream->next_io_index;
332 5720478 : while (stream->initialized_buffers < buffer_index + nblocks)
333 2086330 : stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
334 3634148 : requested_nblocks = nblocks;
335 3634148 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
336 3634148 : &stream->buffers[buffer_index],
337 : stream->pending_read_blocknum,
338 : &nblocks,
339 : flags);
340 3634136 : stream->pinned_buffers += nblocks;
341 :
342 : /* Remember whether we need to wait before returning this buffer. */
343 3634136 : if (!need_wait)
344 : {
345 : /* Look-ahead distance decays, no I/O necessary. */
346 2497462 : if (stream->distance > 1)
347 31750 : stream->distance--;
348 : }
349 : else
350 : {
351 : /*
352 : * Remember to call WaitReadBuffers() before returning head buffer.
353 : * Look-ahead distance will be adjusted after waiting.
354 : */
355 1136674 : stream->ios[io_index].buffer_index = buffer_index;
356 1136674 : if (++stream->next_io_index == stream->max_ios)
357 48104 : stream->next_io_index = 0;
358 : Assert(stream->ios_in_progress < stream->max_ios);
359 1136674 : stream->ios_in_progress++;
360 1136674 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
361 : }
362 :
363 : /*
364 : * How many pins were acquired but forwarded to the next call? These need
365 : * to be passed to the next StartReadBuffers() call by leaving them
366 : * exactly where they are in the queue, or released if the stream ends
367 : * early. We need the number for accounting purposes, since they are not
368 : * counted in stream->pinned_buffers but we already hold them.
369 : */
370 3634136 : forwarded = 0;
371 3636876 : while (nblocks + forwarded < requested_nblocks &&
372 101278 : stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
373 2740 : forwarded++;
374 3634136 : stream->forwarded_buffers = forwarded;
375 :
376 : /*
377 : * We gave a contiguous range of buffer space to StartReadBuffers(), but
378 : * we want it to wrap around at queue_size. Copy overflowing buffers to
379 : * the front of the array where they'll be consumed, but also leave a copy
380 : * in the overflow zone which the I/O operation has a pointer to (it needs
381 : * a contiguous array). Both copies will be cleared when the buffers are
382 : * handed to the consumer.
383 : */
384 3634136 : overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
385 3634136 : if (overflow > 0)
386 : {
387 : Assert(overflow < stream->queue_size); /* can't overlap */
388 660 : memcpy(&stream->buffers[0],
389 660 : &stream->buffers[stream->queue_size],
390 : sizeof(stream->buffers[0]) * overflow);
391 : }
392 :
393 : /* Compute location of start of next read, without using % operator. */
394 3634136 : buffer_index += nblocks;
395 3634136 : if (buffer_index >= stream->queue_size)
396 580220 : buffer_index -= stream->queue_size;
397 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
398 3634136 : stream->next_buffer_index = buffer_index;
399 :
400 : /* Adjust the pending read to cover the remaining portion, if any. */
401 3634136 : stream->pending_read_blocknum += nblocks;
402 3634136 : stream->pending_read_nblocks -= nblocks;
403 :
404 3634136 : return true;
405 : }
406 :
407 : static void
408 6339808 : read_stream_look_ahead(ReadStream *stream)
409 : {
410 : /*
411 : * Allow amortizing the cost of submitting IO over multiple IOs. This
412 : * requires that we don't do any operations that could lead to a deadlock
413 : * with staged-but-unsubmitted IO. The callback needs to opt-in to being
414 : * careful.
415 : */
416 6339808 : if (stream->batch_mode)
417 5413944 : pgaio_enter_batchmode();
418 :
419 10307064 : while (stream->ios_in_progress < stream->max_ios &&
420 10307064 : stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
421 : {
422 : BlockNumber blocknum;
423 : int16 buffer_index;
424 : void *per_buffer_data;
425 :
426 6159376 : if (stream->pending_read_nblocks == stream->io_combine_limit)
427 : {
428 7242 : read_stream_start_pending_read(stream);
429 7242 : continue;
430 : }
431 :
432 : /*
433 : * See which block the callback wants next in the stream. We need to
434 : * compute the index of the Nth block of the pending read including
435 : * wrap-around, but we don't want to use the expensive % operator.
436 : */
437 6152134 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
438 6152134 : if (buffer_index >= stream->queue_size)
439 3594 : buffer_index -= stream->queue_size;
440 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
441 6152134 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
442 6152134 : blocknum = read_stream_get_block(stream, per_buffer_data);
443 6152134 : if (blocknum == InvalidBlockNumber)
444 : {
445 : /* End of stream. */
446 2192112 : stream->distance = 0;
447 2192112 : break;
448 : }
449 :
450 : /* Can we merge it with the pending read? */
451 3960022 : if (stream->pending_read_nblocks > 0 &&
452 427844 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
453 : {
454 427750 : stream->pending_read_nblocks++;
455 427750 : continue;
456 : }
457 :
458 : /* We have to start the pending read before we can build another. */
459 3532368 : while (stream->pending_read_nblocks > 0)
460 : {
461 104 : if (!read_stream_start_pending_read(stream) ||
462 96 : stream->ios_in_progress == stream->max_ios)
463 : {
464 : /* We've hit the buffer or I/O limit. Rewind and stop here. */
465 8 : read_stream_unget_block(stream, blocknum);
466 8 : if (stream->batch_mode)
467 8 : pgaio_exit_batchmode();
468 8 : return;
469 : }
470 : }
471 :
472 : /* This is the start of a new pending read. */
473 3532264 : stream->pending_read_blocknum = blocknum;
474 3532264 : stream->pending_read_nblocks = 1;
475 : }
476 :
477 : /*
478 : * We don't start the pending read just because we've hit the distance
479 : * limit, preferring to give it another chance to grow to full
480 : * io_combine_limit size once more buffers have been consumed. However,
481 : * if we've already reached io_combine_limit, or we've reached the
482 : * distance limit and there isn't anything pinned yet, or the callback has
483 : * signaled end-of-stream, we start the read immediately. Note that the
484 : * pending read can exceed the distance goal, if the latter was reduced
485 : * after hitting the per-backend buffer limit.
486 : */
487 6339800 : if (stream->pending_read_nblocks > 0 &&
488 3707778 : (stream->pending_read_nblocks == stream->io_combine_limit ||
489 3696258 : (stream->pending_read_nblocks >= stream->distance &&
490 3617764 : stream->pinned_buffers == 0) ||
491 88706 : stream->distance == 0) &&
492 3629284 : stream->ios_in_progress < stream->max_ios)
493 3629284 : read_stream_start_pending_read(stream);
494 :
495 : /*
496 : * There should always be something pinned when we leave this function,
497 : * whether started by this call or not, unless we've hit the end of the
498 : * stream. In the worst case we can always make progress one buffer at a
499 : * time.
500 : */
501 : Assert(stream->pinned_buffers > 0 || stream->distance == 0);
502 :
503 6339788 : if (stream->batch_mode)
504 5413924 : pgaio_exit_batchmode();
505 : }
506 :
507 : /*
508 : * Create a new read stream object that can be used to perform the equivalent
509 : * of a series of ReadBuffer() calls for one fork of one relation.
510 : * Internally, it generates larger vectored reads where possible by looking
511 : * ahead. The callback should return block numbers or InvalidBlockNumber to
512 : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
513 : * write extra data for each block into the space provided to it. It will
514 : * also receive callback_private_data for its own purposes.
515 : */
516 : static ReadStream *
517 1013946 : read_stream_begin_impl(int flags,
518 : BufferAccessStrategy strategy,
519 : Relation rel,
520 : SMgrRelation smgr,
521 : char persistence,
522 : ForkNumber forknum,
523 : ReadStreamBlockNumberCB callback,
524 : void *callback_private_data,
525 : size_t per_buffer_data_size)
526 : {
527 : ReadStream *stream;
528 : size_t size;
529 : int16 queue_size;
530 : int16 queue_overflow;
531 : int max_ios;
532 : int strategy_pin_limit;
533 : uint32 max_pinned_buffers;
534 : uint32 max_possible_buffer_limit;
535 : Oid tablespace_id;
536 :
537 : /*
538 : * Decide how many I/Os we will allow to run at the same time. That
539 : * currently means advice to the kernel to tell it that we will soon read.
540 : * This number also affects how far we look ahead for opportunities to
541 : * start more I/Os.
542 : */
543 1013946 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
544 1013946 : if (!OidIsValid(MyDatabaseId) ||
545 1175208 : (rel && IsCatalogRelation(rel)) ||
546 297894 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
547 : {
548 : /*
549 : * Avoid circularity while trying to look up tablespace settings or
550 : * before spccache.c is ready.
551 : */
552 827586 : max_ios = effective_io_concurrency;
553 : }
554 186360 : else if (flags & READ_STREAM_MAINTENANCE)
555 21016 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
556 : else
557 165344 : max_ios = get_tablespace_io_concurrency(tablespace_id);
558 :
559 : /* Cap to INT16_MAX to avoid overflowing below */
560 1013946 : max_ios = Min(max_ios, PG_INT16_MAX);
561 :
562 : /*
563 : * If starting a multi-block I/O near the end of the queue, we might
564 : * temporarily need extra space for overflowing buffers before they are
565 : * moved to regular circular position. This is the maximum extra space we
566 : * could need.
567 : */
568 1013946 : queue_overflow = io_combine_limit - 1;
569 :
570 : /*
571 : * Choose the maximum number of buffers we're prepared to pin. We try to
572 : * pin fewer if we can, though. We add one so that we can make progress
573 : * even if max_ios is set to 0 (see also further down). For max_ios > 0,
574 : * this also allows an extra full I/O's worth of buffers: after an I/O
575 : * finishes we don't want to have to wait for its buffers to be consumed
576 : * before starting a new one.
577 : *
578 : * Be careful not to allow int16 to overflow. That is possible with the
579 : * current GUC range limits, so this is an artificial limit of ~32k
580 : * buffers and we'd need to adjust the types to exceed that. We also have
581 : * to allow for the spare entry and the overflow space.
582 : */
583 1013946 : max_pinned_buffers = (max_ios + 1) * io_combine_limit;
584 1013946 : max_pinned_buffers = Min(max_pinned_buffers,
585 : PG_INT16_MAX - queue_overflow - 1);
586 :
587 : /* Give the strategy a chance to limit the number of buffers we pin. */
588 1013946 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
589 1013946 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
590 :
591 : /*
592 : * Also limit our queue to the maximum number of pins we could ever be
593 : * allowed to acquire according to the buffer manager. We may not really
594 : * be able to use them all due to other pins held by this backend, but
595 : * we'll check that later in read_stream_start_pending_read().
596 : */
597 1013946 : if (SmgrIsTemp(smgr))
598 13454 : max_possible_buffer_limit = GetLocalPinLimit();
599 : else
600 1000492 : max_possible_buffer_limit = GetPinLimit();
601 1013946 : max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
602 :
603 : /*
604 : * The limit might be zero on a system configured with too few buffers for
605 : * the number of connections. We need at least one to make progress.
606 : */
607 1013946 : max_pinned_buffers = Max(1, max_pinned_buffers);
608 :
609 : /*
610 : * We need one extra entry for buffers and per-buffer data, because users
611 : * of per-buffer data have access to the object until the next call to
612 : * read_stream_next_buffer(), so we need a gap between the head and tail
613 : * of the queue so that we don't clobber it.
614 : */
615 1013946 : queue_size = max_pinned_buffers + 1;
616 :
617 : /*
618 : * Allocate the object, the buffers, the ios and per_buffer_data space in
619 : * one big chunk. Though we have queue_size buffers, we want to be able
620 : * to assume that all the buffers for a single read are contiguous (i.e.
621 : * don't wrap around halfway through), so we allow temporary overflows of
622 : * up to the maximum possible overflow size.
623 : */
624 1013946 : size = offsetof(ReadStream, buffers);
625 1013946 : size += sizeof(Buffer) * (queue_size + queue_overflow);
626 1013946 : size += sizeof(InProgressIO) * Max(1, max_ios);
627 1013946 : size += per_buffer_data_size * queue_size;
628 1013946 : size += MAXIMUM_ALIGNOF * 2;
629 1013946 : stream = (ReadStream *) palloc(size);
630 1013946 : memset(stream, 0, offsetof(ReadStream, buffers));
631 1013946 : stream->ios = (InProgressIO *)
632 1013946 : MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
633 1013946 : if (per_buffer_data_size > 0)
634 146044 : stream->per_buffer_data = (void *)
635 146044 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
636 :
637 1013946 : stream->sync_mode = io_method == IOMETHOD_SYNC;
638 1013946 : stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
639 :
640 : #ifdef USE_PREFETCH
641 :
642 : /*
643 : * Read-ahead advice simulating asynchronous I/O with synchronous calls.
644 : * Issue advice only if AIO is not used, direct I/O isn't enabled, the
645 : * caller hasn't promised sequential access (overriding our detection
646 : * heuristics), and max_ios hasn't been set to zero.
647 : */
648 1013946 : if (stream->sync_mode &&
649 5796 : (io_direct_flags & IO_DIRECT_DATA) == 0 &&
650 5796 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
651 : max_ios > 0)
652 1386 : stream->advice_enabled = true;
653 : #endif
654 :
655 : /*
656 : * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
657 : * we still need to allocate space to combine and run one I/O. Bump it up
658 : * to one, and remember to ask for synchronous I/O only.
659 : */
660 1013946 : if (max_ios == 0)
661 : {
662 0 : max_ios = 1;
663 0 : stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
664 : }
665 :
666 : /*
667 : * Capture stable values for these two GUC-derived numbers for the
668 : * lifetime of this stream, so we don't have to worry about the GUCs
669 : * changing underneath us beyond this point.
670 : */
671 1013946 : stream->max_ios = max_ios;
672 1013946 : stream->io_combine_limit = io_combine_limit;
673 :
674 1013946 : stream->per_buffer_data_size = per_buffer_data_size;
675 1013946 : stream->max_pinned_buffers = max_pinned_buffers;
676 1013946 : stream->queue_size = queue_size;
677 1013946 : stream->callback = callback;
678 1013946 : stream->callback_private_data = callback_private_data;
679 1013946 : stream->buffered_blocknum = InvalidBlockNumber;
680 1013946 : stream->seq_blocknum = InvalidBlockNumber;
681 1013946 : stream->seq_until_processed = InvalidBlockNumber;
682 1013946 : stream->temporary = SmgrIsTemp(smgr);
683 :
684 : /*
685 : * Skip the initial ramp-up phase if the caller says we're going to be
686 : * reading the whole relation. This way we start out assuming we'll be
687 : * doing full io_combine_limit sized reads.
688 : */
689 1013946 : if (flags & READ_STREAM_FULL)
690 130798 : stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
691 : else
692 883148 : stream->distance = 1;
693 :
694 : /*
695 : * Since we always access the same relation, we can initialize parts of
696 : * the ReadBuffersOperation objects and leave them that way, to avoid
697 : * wasting CPU cycles writing to them for each read.
698 : */
699 17280126 : for (int i = 0; i < max_ios; ++i)
700 : {
701 16266180 : stream->ios[i].op.rel = rel;
702 16266180 : stream->ios[i].op.smgr = smgr;
703 16266180 : stream->ios[i].op.persistence = persistence;
704 16266180 : stream->ios[i].op.forknum = forknum;
705 16266180 : stream->ios[i].op.strategy = strategy;
706 : }
707 :
708 1013946 : return stream;
709 : }
710 :
711 : /*
712 : * Create a new read stream for reading a relation.
713 : * See read_stream_begin_impl() for the detailed explanation.
714 : */
715 : ReadStream *
716 894340 : read_stream_begin_relation(int flags,
717 : BufferAccessStrategy strategy,
718 : Relation rel,
719 : ForkNumber forknum,
720 : ReadStreamBlockNumberCB callback,
721 : void *callback_private_data,
722 : size_t per_buffer_data_size)
723 : {
724 894340 : return read_stream_begin_impl(flags,
725 : strategy,
726 : rel,
727 : RelationGetSmgr(rel),
728 894340 : rel->rd_rel->relpersistence,
729 : forknum,
730 : callback,
731 : callback_private_data,
732 : per_buffer_data_size);
733 : }
734 :
735 : /*
736 : * Create a new read stream for reading a SMgr relation.
737 : * See read_stream_begin_impl() for the detailed explanation.
738 : */
739 : ReadStream *
740 119606 : read_stream_begin_smgr_relation(int flags,
741 : BufferAccessStrategy strategy,
742 : SMgrRelation smgr,
743 : char smgr_persistence,
744 : ForkNumber forknum,
745 : ReadStreamBlockNumberCB callback,
746 : void *callback_private_data,
747 : size_t per_buffer_data_size)
748 : {
749 119606 : return read_stream_begin_impl(flags,
750 : strategy,
751 : NULL,
752 : smgr,
753 : smgr_persistence,
754 : forknum,
755 : callback,
756 : callback_private_data,
757 : per_buffer_data_size);
758 : }
759 :
760 : /*
761 : * Pull one pinned buffer out of a stream. Each call returns successive
762 : * blocks in the order specified by the callback. If per_buffer_data_size was
763 : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
764 : * per-buffer data that the callback had a chance to populate, which remains
765 : * valid until the next call to read_stream_next_buffer(). When the stream
766 : * runs out of data, InvalidBuffer is returned. The caller may decide to end
767 : * the stream early at any time by calling read_stream_end().
768 : */
769 : Buffer
770 12418326 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
771 : {
772 : Buffer buffer;
773 : int16 oldest_buffer_index;
774 :
775 : #ifndef READ_STREAM_DISABLE_FAST_PATH
776 :
777 : /*
778 : * A fast path for all-cached scans. This is the same as the usual
779 : * algorithm, but it is specialized for no I/O and no per-buffer data, so
780 : * we can skip the queue management code, stay in the same buffer slot and
781 : * use singular StartReadBuffer().
782 : */
783 12418326 : if (likely(stream->fast_path))
784 : {
785 : BlockNumber next_blocknum;
786 :
787 : /* Fast path assumptions. */
788 : Assert(stream->ios_in_progress == 0);
789 : Assert(stream->forwarded_buffers == 0);
790 : Assert(stream->pinned_buffers == 1);
791 : Assert(stream->distance == 1);
792 : Assert(stream->pending_read_nblocks == 0);
793 : Assert(stream->per_buffer_data_size == 0);
794 : Assert(stream->initialized_buffers > stream->oldest_buffer_index);
795 :
796 : /* We're going to return the buffer we pinned last time. */
797 3967652 : oldest_buffer_index = stream->oldest_buffer_index;
798 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
799 : stream->next_buffer_index);
800 3967652 : buffer = stream->buffers[oldest_buffer_index];
801 : Assert(buffer != InvalidBuffer);
802 :
803 : /* Choose the next block to pin. */
804 3967652 : next_blocknum = read_stream_get_block(stream, NULL);
805 :
806 3967652 : if (likely(next_blocknum != InvalidBlockNumber))
807 : {
808 3797098 : int flags = stream->read_buffers_flags;
809 :
810 3797098 : if (stream->advice_enabled)
811 1058 : flags |= READ_BUFFERS_ISSUE_ADVICE;
812 :
813 : /*
814 : * Pin a buffer for the next call. Same buffer entry, and
815 : * arbitrary I/O entry (they're all free). We don't have to
816 : * adjust pinned_buffers because we're transferring one to caller
817 : * but pinning one more.
818 : *
819 : * In the fast path we don't need to check the pin limit. We're
820 : * always allowed at least one pin so that progress can be made,
821 : * and that's all we need here. Although two pins are momentarily
822 : * held at the same time, the model used here is that the stream
823 : * holds only one, and the other now belongs to the caller.
824 : */
825 3797098 : if (likely(!StartReadBuffer(&stream->ios[0].op,
826 : &stream->buffers[oldest_buffer_index],
827 : next_blocknum,
828 : flags)))
829 : {
830 : /* Fast return. */
831 3772988 : return buffer;
832 : }
833 :
834 : /* Next call must wait for I/O for the newly pinned buffer. */
835 24110 : stream->oldest_io_index = 0;
836 24110 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
837 24110 : stream->ios_in_progress = 1;
838 24110 : stream->ios[0].buffer_index = oldest_buffer_index;
839 24110 : stream->seq_blocknum = next_blocknum + 1;
840 : }
841 : else
842 : {
843 : /* No more blocks, end of stream. */
844 170554 : stream->distance = 0;
845 170554 : stream->oldest_buffer_index = stream->next_buffer_index;
846 170554 : stream->pinned_buffers = 0;
847 170554 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
848 : }
849 :
850 194664 : stream->fast_path = false;
851 194664 : return buffer;
852 : }
853 : #endif
854 :
855 8450674 : if (unlikely(stream->pinned_buffers == 0))
856 : {
857 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
858 :
859 : /* End of stream reached? */
860 5874150 : if (stream->distance == 0)
861 3321952 : return InvalidBuffer;
862 :
863 : /*
864 : * The usual order of operations is that we look ahead at the bottom
865 : * of this function after potentially finishing an I/O and making
866 : * space for more, but if we're just starting up we'll need to crank
867 : * the handle to get started.
868 : */
869 2552198 : read_stream_look_ahead(stream);
870 :
871 : /* End of stream reached? */
872 2552198 : if (stream->pinned_buffers == 0)
873 : {
874 : Assert(stream->distance == 0);
875 1341070 : return InvalidBuffer;
876 : }
877 : }
878 :
879 : /* Grab the oldest pinned buffer and associated per-buffer data. */
880 : Assert(stream->pinned_buffers > 0);
881 3787652 : oldest_buffer_index = stream->oldest_buffer_index;
882 : Assert(oldest_buffer_index >= 0 &&
883 : oldest_buffer_index < stream->queue_size);
884 3787652 : buffer = stream->buffers[oldest_buffer_index];
885 3787652 : if (per_buffer_data)
886 1227534 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
887 :
888 : Assert(BufferIsValid(buffer));
889 :
890 : /* Do we have to wait for an associated I/O first? */
891 3787652 : if (stream->ios_in_progress > 0 &&
892 1328764 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
893 : {
894 1160192 : int16 io_index = stream->oldest_io_index;
895 : int32 distance; /* wider temporary value, clamped below */
896 :
897 : /* Sanity check that we still agree on the buffers. */
898 : Assert(stream->ios[io_index].op.buffers ==
899 : &stream->buffers[oldest_buffer_index]);
900 :
901 1160192 : WaitReadBuffers(&stream->ios[io_index].op);
902 :
903 : Assert(stream->ios_in_progress > 0);
904 1160150 : stream->ios_in_progress--;
905 1160150 : if (++stream->oldest_io_index == stream->max_ios)
906 48104 : stream->oldest_io_index = 0;
907 :
908 : /* Look-ahead distance ramps up rapidly after we do I/O. */
909 1160150 : distance = stream->distance * 2;
910 1160150 : distance = Min(distance, stream->max_pinned_buffers);
911 1160150 : stream->distance = distance;
912 :
913 : /*
914 : * If we've reached the first block of a sequential region we're
915 : * issuing advice for, cancel that until the next jump. The kernel
916 : * will see the sequential preadv() pattern starting here.
917 : */
918 1160150 : if (stream->advice_enabled &&
919 544 : stream->ios[io_index].op.blocknum == stream->seq_until_processed)
920 502 : stream->seq_until_processed = InvalidBlockNumber;
921 : }
922 :
923 : /*
924 : * We must zap this queue entry, or else it would appear as a forwarded
925 : * buffer. If it's potentially in the overflow zone (ie from a
926 : * multi-block I/O that wrapped around the queue), also zap the copy.
927 : */
928 3787610 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
929 3787610 : if (oldest_buffer_index < stream->io_combine_limit - 1)
930 2673400 : stream->buffers[stream->queue_size + oldest_buffer_index] =
931 : InvalidBuffer;
932 :
933 : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
934 :
935 : /*
936 : * The caller will get access to the per-buffer data, until the next call.
937 : * We wipe the one before, which is never occupied because queue_size
938 : * allowed one extra element. This will hopefully trip up client code
939 : * that is holding a dangling pointer to it.
940 : */
941 : if (stream->per_buffer_data)
942 : {
943 : void *per_buffer_data;
944 :
945 : per_buffer_data = get_per_buffer_data(stream,
946 : oldest_buffer_index == 0 ?
947 : stream->queue_size - 1 :
948 : oldest_buffer_index - 1);
949 :
950 : #if defined(CLOBBER_FREED_MEMORY)
951 : /* This also tells Valgrind the memory is "noaccess". */
952 : wipe_mem(per_buffer_data, stream->per_buffer_data_size);
953 : #elif defined(USE_VALGRIND)
954 : /* Tell it ourselves. */
955 : VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
956 : stream->per_buffer_data_size);
957 : #endif
958 : }
959 : #endif
960 :
961 : /* Pin transferred to caller. */
962 : Assert(stream->pinned_buffers > 0);
963 3787610 : stream->pinned_buffers--;
964 :
965 : /* Advance oldest buffer, with wrap-around. */
966 3787610 : stream->oldest_buffer_index++;
967 3787610 : if (stream->oldest_buffer_index == stream->queue_size)
968 566456 : stream->oldest_buffer_index = 0;
969 :
970 : /* Prepare for the next call. */
971 3787610 : read_stream_look_ahead(stream);
972 :
973 : #ifndef READ_STREAM_DISABLE_FAST_PATH
974 : /* See if we can take the fast path for all-cached scans next time. */
975 3787598 : if (stream->ios_in_progress == 0 &&
976 2620704 : stream->forwarded_buffers == 0 &&
977 2619694 : stream->pinned_buffers == 1 &&
978 1468862 : stream->distance == 1 &&
979 1332206 : stream->pending_read_nblocks == 0 &&
980 1329450 : stream->per_buffer_data_size == 0)
981 : {
982 373904 : stream->fast_path = true;
983 : }
984 : #endif
985 :
986 3787598 : return buffer;
987 : }
988 :
989 : /*
990 : * Transitional support for code that would like to perform or skip reads
991 : * itself, without using the stream. Returns, and consumes, the next block
992 : * number that would be read by the stream's look-ahead algorithm, or
993 : * InvalidBlockNumber if the end of the stream is reached. Also reports the
994 : * strategy that would be used to read it.
995 : */
996 : BlockNumber
997 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
998 : {
999 0 : *strategy = stream->ios[0].op.strategy;
1000 0 : return read_stream_get_block(stream, NULL);
1001 : }
1002 :
1003 : /*
1004 : * Reset a read stream by releasing any queued up buffers, allowing the stream
1005 : * to be used again for different blocks. This can be used to clear an
1006 : * end-of-stream condition and start again, or to throw away blocks that were
1007 : * speculatively read and read some different blocks instead.
1008 : */
1009 : void
1010 2553584 : read_stream_reset(ReadStream *stream)
1011 : {
1012 : int16 index;
1013 : Buffer buffer;
1014 :
1015 : /* Stop looking ahead. */
1016 2553584 : stream->distance = 0;
1017 :
1018 : /* Forget buffered block number and fast path state. */
1019 2553584 : stream->buffered_blocknum = InvalidBlockNumber;
1020 2553584 : stream->fast_path = false;
1021 :
1022 : /* Unpin anything that wasn't consumed. */
1023 2785266 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1024 231682 : ReleaseBuffer(buffer);
1025 :
1026 : /* Unpin any unused forwarded buffers. */
1027 2553584 : index = stream->next_buffer_index;
1028 2553584 : while (index < stream->initialized_buffers &&
1029 456268 : (buffer = stream->buffers[index]) != InvalidBuffer)
1030 : {
1031 : Assert(stream->forwarded_buffers > 0);
1032 0 : stream->forwarded_buffers--;
1033 0 : ReleaseBuffer(buffer);
1034 :
1035 0 : stream->buffers[index] = InvalidBuffer;
1036 0 : if (index < stream->io_combine_limit - 1)
1037 0 : stream->buffers[stream->queue_size + index] = InvalidBuffer;
1038 :
1039 0 : if (++index == stream->queue_size)
1040 0 : index = 0;
1041 : }
1042 :
1043 : Assert(stream->forwarded_buffers == 0);
1044 : Assert(stream->pinned_buffers == 0);
1045 : Assert(stream->ios_in_progress == 0);
1046 :
1047 : /* Start off assuming data is cached. */
1048 2553584 : stream->distance = 1;
1049 2553584 : }
1050 :
1051 : /*
1052 : * Release and free a read stream.
1053 : */
1054 : void
1055 1009416 : read_stream_end(ReadStream *stream)
1056 : {
1057 1009416 : read_stream_reset(stream);
1058 1009416 : pfree(stream);
1059 1009416 : }
|