Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * read_stream.c
4 : * Mechanism for accessing buffered relation data with look-ahead
5 : *
6 : * Code that needs to access relation data typically pins blocks one at a
7 : * time, often in a predictable order that might be sequential or data-driven.
8 : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : * because blocks that are not yet in the buffer pool require I/O operations
10 : * that are small and might stall waiting for storage. This mechanism looks
11 : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : * distance.
14 : *
15 : * A user-provided callback generates a stream of block numbers that is used
16 : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : * pending read. When that isn't possible, the existing pending read is sent
18 : * to StartReadBuffers() so that a new one can begin to form.
19 : *
20 : * The algorithm for controlling the look-ahead distance tries to classify the
21 : * stream into three ideal behaviors:
22 : *
23 : * A) No I/O is necessary, because the requested blocks are fully cached
24 : * already. There is no benefit to looking ahead more than one block, so
25 : * distance is 1. This is the default initial assumption.
26 : *
27 : * B) I/O is necessary, but read-ahead advice is undesirable because the
28 : * access is sequential and we can rely on the kernel's read-ahead heuristics,
29 : * or impossible because direct I/O is enabled, or the system doesn't support
30 : * read-ahead advice. There is no benefit in looking ahead more than
31 : * io_combine_limit, because in this case the only goal is larger read system
32 : * calls. Looking further ahead would pin many buffers and perform
33 : * speculative work for no benefit.
34 : *
35 : * C) I/O is necessary, it appears to be random, and this system supports
36 : * read-ahead advice. We'll look further ahead in order to reach the
37 : * configured level of I/O concurrency.
38 : *
39 : * The distance increases rapidly and decays slowly, so that it moves towards
40 : * those levels as different I/O patterns are discovered. For example, a
41 : * sequential scan of fully cached data doesn't bother looking ahead, but a
42 : * sequential scan that hits a region of uncached blocks will start issuing
43 : * increasingly wide read calls until it plateaus at io_combine_limit.
44 : *
45 : * The main data structure is a circular queue of buffers of size
46 : * max_pinned_buffers plus some extra space for technical reasons, ready to be
47 : * returned by read_stream_next_buffer(). Each buffer also has an optional
48 : * variable sized object that is passed from the callback to the consumer of
49 : * buffers.
50 : *
51 : * Parallel to the queue of buffers, there is a circular queue of in-progress
52 : * I/Os that have been started with StartReadBuffers(), and for which
53 : * WaitReadBuffers() must be called before returning the buffer.
54 : *
55 : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
56 : * successive calls, then these data structures might appear as follows:
57 : *
58 : * buffers buf/data ios
59 : *
60 : * +----+ +-----+ +--------+
61 : * | | | | +----+ 42..44 | <- oldest_io_index
62 : * +----+ +-----+ | +--------+
63 : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
64 : * +----+ +-----+ | | +--------+
65 : * | 42 | | ? |<-+ | | | <- next_io_index
66 : * +----+ +-----+ | +--------+
67 : * | 43 | | ? | | | |
68 : * +----+ +-----+ | +--------+
69 : * | 44 | | ? | | | |
70 : * +----+ +-----+ | +--------+
71 : * | 60 | | ? |<---+
72 : * +----+ +-----+
73 : * next_buffer_index -> | | | |
74 : * +----+ +-----+
75 : *
76 : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
77 : * the client is block 10. Block 10 was a hit and has no associated I/O, but
78 : * the range 42..44 requires an I/O wait before its buffers are returned, as
79 : * does block 60.
80 : *
81 : *
82 : * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
83 : * Portions Copyright (c) 1994, Regents of the University of California
84 : *
85 : * IDENTIFICATION
86 : * src/backend/storage/aio/read_stream.c
87 : *
88 : *-------------------------------------------------------------------------
89 : */
90 : #include "postgres.h"
91 :
92 : #include "miscadmin.h"
93 : #include "storage/fd.h"
94 : #include "storage/smgr.h"
95 : #include "storage/read_stream.h"
96 : #include "utils/memdebug.h"
97 : #include "utils/rel.h"
98 : #include "utils/spccache.h"
99 :
100 : typedef struct InProgressIO
101 : {
102 : int16 buffer_index;
103 : ReadBuffersOperation op;
104 : } InProgressIO;
105 :
106 : /*
107 : * State for managing a stream of reads.
108 : */
109 : struct ReadStream
110 : {
111 : int16 max_ios;
112 : int16 ios_in_progress;
113 : int16 queue_size;
114 : int16 max_pinned_buffers;
115 : int16 pinned_buffers;
116 : int16 distance;
117 : bool advice_enabled;
118 :
119 : /*
120 : * One-block buffer to support 'ungetting' a block number, to resolve flow
121 : * control problems when I/Os are split.
122 : */
123 : BlockNumber buffered_blocknum;
124 :
125 : /*
126 : * The callback that will tell us which block numbers to read, and an
127 : * opaque pointer that will be pass to it for its own purposes.
128 : */
129 : ReadStreamBlockNumberCB callback;
130 : void *callback_private_data;
131 :
132 : /* Next expected block, for detecting sequential access. */
133 : BlockNumber seq_blocknum;
134 :
135 : /* The read operation we are currently preparing. */
136 : BlockNumber pending_read_blocknum;
137 : int16 pending_read_nblocks;
138 :
139 : /* Space for buffers and optional per-buffer private data. */
140 : size_t per_buffer_data_size;
141 : void *per_buffer_data;
142 :
143 : /* Read operations that have been started but not waited for yet. */
144 : InProgressIO *ios;
145 : int16 oldest_io_index;
146 : int16 next_io_index;
147 :
148 : bool fast_path;
149 :
150 : /* Circular queue of buffers. */
151 : int16 oldest_buffer_index; /* Next pinned buffer to return */
152 : int16 next_buffer_index; /* Index of next buffer to pin */
153 : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
154 : };
155 :
156 : /*
157 : * Return a pointer to the per-buffer data by index.
158 : */
159 : static inline void *
160 3868106 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
161 : {
162 7736212 : return (char *) stream->per_buffer_data +
163 3868106 : stream->per_buffer_data_size * buffer_index;
164 : }
165 :
166 : /*
167 : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
168 : * blocks [current_blocknum, last_exclusive).
169 : */
170 : BlockNumber
171 538476 : block_range_read_stream_cb(ReadStream *stream,
172 : void *callback_private_data,
173 : void *per_buffer_data)
174 : {
175 538476 : BlockRangeReadStreamPrivate *p = callback_private_data;
176 :
177 538476 : if (p->current_blocknum < p->last_exclusive)
178 426522 : return p->current_blocknum++;
179 :
180 111954 : return InvalidBlockNumber;
181 : }
182 :
183 : /*
184 : * Ask the callback which block it would like us to read next, with a one block
185 : * buffer in front to allow read_stream_unget_block() to work.
186 : */
187 : static inline BlockNumber
188 7393644 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
189 : {
190 : BlockNumber blocknum;
191 :
192 7393644 : blocknum = stream->buffered_blocknum;
193 7393644 : if (blocknum != InvalidBlockNumber)
194 0 : stream->buffered_blocknum = InvalidBlockNumber;
195 : else
196 7393644 : blocknum = stream->callback(stream,
197 : stream->callback_private_data,
198 : per_buffer_data);
199 :
200 7393644 : return blocknum;
201 : }
202 :
203 : /*
204 : * In order to deal with short reads in StartReadBuffers(), we sometimes need
205 : * to defer handling of a block until later.
206 : */
207 : static inline void
208 0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
209 : {
210 : /* We shouldn't ever unget more than one block. */
211 : Assert(stream->buffered_blocknum == InvalidBlockNumber);
212 : Assert(blocknum != InvalidBlockNumber);
213 0 : stream->buffered_blocknum = blocknum;
214 0 : }
215 :
216 : static void
217 2167118 : read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
218 : {
219 : bool need_wait;
220 : int nblocks;
221 : int flags;
222 : int16 io_index;
223 : int16 overflow;
224 : int16 buffer_index;
225 :
226 : /* This should only be called with a pending read. */
227 : Assert(stream->pending_read_nblocks > 0);
228 : Assert(stream->pending_read_nblocks <= io_combine_limit);
229 :
230 : /* We had better not exceed the pin limit by starting this read. */
231 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
232 : stream->max_pinned_buffers);
233 :
234 : /* We had better not be overwriting an existing pinned buffer. */
235 2167118 : if (stream->pinned_buffers > 0)
236 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
237 : else
238 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
239 :
240 : /*
241 : * If advice hasn't been suppressed, this system supports it, and this
242 : * isn't a strictly sequential pattern, then we'll issue advice.
243 : */
244 2167118 : if (!suppress_advice &&
245 1170488 : stream->advice_enabled &&
246 142464 : stream->pending_read_blocknum != stream->seq_blocknum)
247 65240 : flags = READ_BUFFERS_ISSUE_ADVICE;
248 : else
249 2101878 : flags = 0;
250 :
251 : /* We say how many blocks we want to read, but may be smaller on return. */
252 2167118 : buffer_index = stream->next_buffer_index;
253 2167118 : io_index = stream->next_io_index;
254 2167118 : nblocks = stream->pending_read_nblocks;
255 2167118 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
256 2167118 : &stream->buffers[buffer_index],
257 : stream->pending_read_blocknum,
258 : &nblocks,
259 : flags);
260 2167118 : stream->pinned_buffers += nblocks;
261 :
262 : /* Remember whether we need to wait before returning this buffer. */
263 2167118 : if (!need_wait)
264 : {
265 : /* Look-ahead distance decays, no I/O necessary (behavior A). */
266 1271208 : if (stream->distance > 1)
267 16538 : stream->distance--;
268 : }
269 : else
270 : {
271 : /*
272 : * Remember to call WaitReadBuffers() before returning head buffer.
273 : * Look-ahead distance will be adjusted after waiting.
274 : */
275 895910 : stream->ios[io_index].buffer_index = buffer_index;
276 895910 : if (++stream->next_io_index == stream->max_ios)
277 875956 : stream->next_io_index = 0;
278 : Assert(stream->ios_in_progress < stream->max_ios);
279 895910 : stream->ios_in_progress++;
280 895910 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
281 : }
282 :
283 : /*
284 : * We gave a contiguous range of buffer space to StartReadBuffers(), but
285 : * we want it to wrap around at queue_size. Slide overflowing buffers to
286 : * the front of the array.
287 : */
288 2167118 : overflow = (buffer_index + nblocks) - stream->queue_size;
289 2167118 : if (overflow > 0)
290 7982 : memmove(&stream->buffers[0],
291 7982 : &stream->buffers[stream->queue_size],
292 : sizeof(stream->buffers[0]) * overflow);
293 :
294 : /* Compute location of start of next read, without using % operator. */
295 2167118 : buffer_index += nblocks;
296 2167118 : if (buffer_index >= stream->queue_size)
297 470248 : buffer_index -= stream->queue_size;
298 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
299 2167118 : stream->next_buffer_index = buffer_index;
300 :
301 : /* Adjust the pending read to cover the remaining portion, if any. */
302 2167118 : stream->pending_read_blocknum += nblocks;
303 2167118 : stream->pending_read_nblocks -= nblocks;
304 2167118 : }
305 :
306 : static void
307 4036228 : read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
308 : {
309 6448098 : while (stream->ios_in_progress < stream->max_ios &&
310 6423556 : stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
311 : {
312 : BlockNumber blocknum;
313 : int16 buffer_index;
314 : void *per_buffer_data;
315 :
316 3868106 : if (stream->pending_read_nblocks == io_combine_limit)
317 : {
318 0 : read_stream_start_pending_read(stream, suppress_advice);
319 0 : suppress_advice = false;
320 0 : continue;
321 : }
322 :
323 : /*
324 : * See which block the callback wants next in the stream. We need to
325 : * compute the index of the Nth block of the pending read including
326 : * wrap-around, but we don't want to use the expensive % operator.
327 : */
328 3868106 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
329 3868106 : if (buffer_index >= stream->queue_size)
330 93650 : buffer_index -= stream->queue_size;
331 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
332 3868106 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
333 3868106 : blocknum = read_stream_get_block(stream, per_buffer_data);
334 3868106 : if (blocknum == InvalidBlockNumber)
335 : {
336 : /* End of stream. */
337 1456236 : stream->distance = 0;
338 1456236 : break;
339 : }
340 :
341 : /* Can we merge it with the pending read? */
342 2411870 : if (stream->pending_read_nblocks > 0 &&
343 311540 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
344 : {
345 311538 : stream->pending_read_nblocks++;
346 311538 : continue;
347 : }
348 :
349 : /* We have to start the pending read before we can build another. */
350 2100334 : while (stream->pending_read_nblocks > 0)
351 : {
352 2 : read_stream_start_pending_read(stream, suppress_advice);
353 2 : suppress_advice = false;
354 2 : if (stream->ios_in_progress == stream->max_ios)
355 : {
356 : /* And we've hit the limit. Rewind, and stop here. */
357 0 : read_stream_unget_block(stream, blocknum);
358 0 : return;
359 : }
360 : }
361 :
362 : /* This is the start of a new pending read. */
363 2100332 : stream->pending_read_blocknum = blocknum;
364 2100332 : stream->pending_read_nblocks = 1;
365 : }
366 :
367 : /*
368 : * We don't start the pending read just because we've hit the distance
369 : * limit, preferring to give it another chance to grow to full
370 : * io_combine_limit size once more buffers have been consumed. However,
371 : * if we've already reached io_combine_limit, or we've reached the
372 : * distance limit and there isn't anything pinned yet, or the callback has
373 : * signaled end-of-stream, we start the read immediately.
374 : */
375 4036228 : if (stream->pending_read_nblocks > 0 &&
376 2277234 : (stream->pending_read_nblocks == io_combine_limit ||
377 2268014 : (stream->pending_read_nblocks == stream->distance &&
378 2014424 : stream->pinned_buffers == 0) ||
379 253590 : stream->distance == 0) &&
380 2167258 : stream->ios_in_progress < stream->max_ios)
381 2167116 : read_stream_start_pending_read(stream, suppress_advice);
382 : }
383 :
384 : /*
385 : * Create a new read stream object that can be used to perform the equivalent
386 : * of a series of ReadBuffer() calls for one fork of one relation.
387 : * Internally, it generates larger vectored reads where possible by looking
388 : * ahead. The callback should return block numbers or InvalidBlockNumber to
389 : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
390 : * write extra data for each block into the space provided to it. It will
391 : * also receive callback_private_data for its own purposes.
392 : */
393 : static ReadStream *
394 775028 : read_stream_begin_impl(int flags,
395 : BufferAccessStrategy strategy,
396 : Relation rel,
397 : SMgrRelation smgr,
398 : char persistence,
399 : ForkNumber forknum,
400 : ReadStreamBlockNumberCB callback,
401 : void *callback_private_data,
402 : size_t per_buffer_data_size)
403 : {
404 : ReadStream *stream;
405 : size_t size;
406 : int16 queue_size;
407 : int max_ios;
408 : int strategy_pin_limit;
409 : uint32 max_pinned_buffers;
410 : Oid tablespace_id;
411 :
412 : /*
413 : * Decide how many I/Os we will allow to run at the same time. That
414 : * currently means advice to the kernel to tell it that we will soon read.
415 : * This number also affects how far we look ahead for opportunities to
416 : * start more I/Os.
417 : */
418 775028 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
419 775028 : if (!OidIsValid(MyDatabaseId) ||
420 899362 : (rel && IsCatalogRelation(rel)) ||
421 247624 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
422 : {
423 : /*
424 : * Avoid circularity while trying to look up tablespace settings or
425 : * before spccache.c is ready.
426 : */
427 627898 : max_ios = effective_io_concurrency;
428 : }
429 147130 : else if (flags & READ_STREAM_MAINTENANCE)
430 5760 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
431 : else
432 141370 : max_ios = get_tablespace_io_concurrency(tablespace_id);
433 :
434 : /* Cap to INT16_MAX to avoid overflowing below */
435 775028 : max_ios = Min(max_ios, PG_INT16_MAX);
436 :
437 : /*
438 : * Choose the maximum number of buffers we're prepared to pin. We try to
439 : * pin fewer if we can, though. We clamp it to at least io_combine_limit
440 : * so that we can have a chance to build up a full io_combine_limit sized
441 : * read, even when max_ios is zero. Be careful not to allow int16 to
442 : * overflow (even though that's not possible with the current GUC range
443 : * limits), allowing also for the spare entry and the overflow space.
444 : */
445 775028 : max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
446 775028 : max_pinned_buffers = Min(max_pinned_buffers,
447 : PG_INT16_MAX - io_combine_limit - 1);
448 :
449 : /* Give the strategy a chance to limit the number of buffers we pin. */
450 775028 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
451 775028 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
452 :
453 : /* Don't allow this backend to pin more than its share of buffers. */
454 775028 : if (SmgrIsTemp(smgr))
455 11038 : LimitAdditionalLocalPins(&max_pinned_buffers);
456 : else
457 763990 : LimitAdditionalPins(&max_pinned_buffers);
458 : Assert(max_pinned_buffers > 0);
459 :
460 : /*
461 : * We need one extra entry for buffers and per-buffer data, because users
462 : * of per-buffer data have access to the object until the next call to
463 : * read_stream_next_buffer(), so we need a gap between the head and tail
464 : * of the queue so that we don't clobber it.
465 : */
466 775028 : queue_size = max_pinned_buffers + 1;
467 :
468 : /*
469 : * Allocate the object, the buffers, the ios and per_buffer_data space in
470 : * one big chunk. Though we have queue_size buffers, we want to be able
471 : * to assume that all the buffers for a single read are contiguous (i.e.
472 : * don't wrap around halfway through), so we allow temporary overflows of
473 : * up to the maximum possible read size by allocating an extra
474 : * io_combine_limit - 1 elements.
475 : */
476 775028 : size = offsetof(ReadStream, buffers);
477 775028 : size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
478 775028 : size += sizeof(InProgressIO) * Max(1, max_ios);
479 775028 : size += per_buffer_data_size * queue_size;
480 775028 : size += MAXIMUM_ALIGNOF * 2;
481 775028 : stream = (ReadStream *) palloc(size);
482 775028 : memset(stream, 0, offsetof(ReadStream, buffers));
483 775028 : stream->ios = (InProgressIO *)
484 775028 : MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
485 775028 : if (per_buffer_data_size > 0)
486 0 : stream->per_buffer_data = (void *)
487 0 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
488 :
489 : #ifdef USE_PREFETCH
490 :
491 : /*
492 : * This system supports prefetching advice. We can use it as long as
493 : * direct I/O isn't enabled, the caller hasn't promised sequential access
494 : * (overriding our detection heuristics), and max_ios hasn't been set to
495 : * zero.
496 : */
497 775028 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
498 774822 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
499 : max_ios > 0)
500 127868 : stream->advice_enabled = true;
501 : #endif
502 :
503 : /*
504 : * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
505 : * above. If we had real asynchronous I/O we might need a slightly
506 : * different definition.
507 : */
508 775028 : if (max_ios == 0)
509 0 : max_ios = 1;
510 :
511 775028 : stream->max_ios = max_ios;
512 775028 : stream->per_buffer_data_size = per_buffer_data_size;
513 775028 : stream->max_pinned_buffers = max_pinned_buffers;
514 775028 : stream->queue_size = queue_size;
515 775028 : stream->callback = callback;
516 775028 : stream->callback_private_data = callback_private_data;
517 775028 : stream->buffered_blocknum = InvalidBlockNumber;
518 :
519 : /*
520 : * Skip the initial ramp-up phase if the caller says we're going to be
521 : * reading the whole relation. This way we start out assuming we'll be
522 : * doing full io_combine_limit sized reads (behavior B).
523 : */
524 775028 : if (flags & READ_STREAM_FULL)
525 113316 : stream->distance = Min(max_pinned_buffers, io_combine_limit);
526 : else
527 661712 : stream->distance = 1;
528 :
529 : /*
530 : * Since we always access the same relation, we can initialize parts of
531 : * the ReadBuffersOperation objects and leave them that way, to avoid
532 : * wasting CPU cycles writing to them for each read.
533 : */
534 1650700 : for (int i = 0; i < max_ios; ++i)
535 : {
536 875672 : stream->ios[i].op.rel = rel;
537 875672 : stream->ios[i].op.smgr = smgr;
538 875672 : stream->ios[i].op.persistence = persistence;
539 875672 : stream->ios[i].op.forknum = forknum;
540 875672 : stream->ios[i].op.strategy = strategy;
541 : }
542 :
543 775028 : return stream;
544 : }
545 :
546 : /*
547 : * Create a new read stream for reading a relation.
548 : * See read_stream_begin_impl() for the detailed explanation.
549 : */
550 : ReadStream *
551 667242 : read_stream_begin_relation(int flags,
552 : BufferAccessStrategy strategy,
553 : Relation rel,
554 : ForkNumber forknum,
555 : ReadStreamBlockNumberCB callback,
556 : void *callback_private_data,
557 : size_t per_buffer_data_size)
558 : {
559 667242 : return read_stream_begin_impl(flags,
560 : strategy,
561 : rel,
562 : RelationGetSmgr(rel),
563 667242 : rel->rd_rel->relpersistence,
564 : forknum,
565 : callback,
566 : callback_private_data,
567 : per_buffer_data_size);
568 : }
569 :
570 : /*
571 : * Create a new read stream for reading a SMgr relation.
572 : * See read_stream_begin_impl() for the detailed explanation.
573 : */
574 : ReadStream *
575 107786 : read_stream_begin_smgr_relation(int flags,
576 : BufferAccessStrategy strategy,
577 : SMgrRelation smgr,
578 : char smgr_persistence,
579 : ForkNumber forknum,
580 : ReadStreamBlockNumberCB callback,
581 : void *callback_private_data,
582 : size_t per_buffer_data_size)
583 : {
584 107786 : return read_stream_begin_impl(flags,
585 : strategy,
586 : NULL,
587 : smgr,
588 : smgr_persistence,
589 : forknum,
590 : callback,
591 : callback_private_data,
592 : per_buffer_data_size);
593 : }
594 :
595 : /*
596 : * Pull one pinned buffer out of a stream. Each call returns successive
597 : * blocks in the order specified by the callback. If per_buffer_data_size was
598 : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
599 : * per-buffer data that the callback had a chance to populate, which remains
600 : * valid until the next call to read_stream_next_buffer(). When the stream
601 : * runs out of data, InvalidBuffer is returned. The caller may decide to end
602 : * the stream early at any time by calling read_stream_end().
603 : */
604 : Buffer
605 8942434 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
606 : {
607 : Buffer buffer;
608 : int16 oldest_buffer_index;
609 :
610 : #ifndef READ_STREAM_DISABLE_FAST_PATH
611 :
612 : /*
613 : * A fast path for all-cached scans (behavior A). This is the same as the
614 : * usual algorithm, but it is specialized for no I/O and no per-buffer
615 : * data, so we can skip the queue management code, stay in the same buffer
616 : * slot and use singular StartReadBuffer().
617 : */
618 8942434 : if (likely(stream->fast_path))
619 : {
620 : BlockNumber next_blocknum;
621 :
622 : /* Fast path assumptions. */
623 : Assert(stream->ios_in_progress == 0);
624 : Assert(stream->pinned_buffers == 1);
625 : Assert(stream->distance == 1);
626 : Assert(stream->pending_read_nblocks == 0);
627 : Assert(stream->per_buffer_data_size == 0);
628 :
629 : /* We're going to return the buffer we pinned last time. */
630 3525538 : oldest_buffer_index = stream->oldest_buffer_index;
631 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
632 : stream->next_buffer_index);
633 3525538 : buffer = stream->buffers[oldest_buffer_index];
634 : Assert(buffer != InvalidBuffer);
635 :
636 : /* Choose the next block to pin. */
637 3525538 : next_blocknum = read_stream_get_block(stream, NULL);
638 :
639 3525538 : if (likely(next_blocknum != InvalidBlockNumber))
640 : {
641 : /*
642 : * Pin a buffer for the next call. Same buffer entry, and
643 : * arbitrary I/O entry (they're all free). We don't have to
644 : * adjust pinned_buffers because we're transferring one to caller
645 : * but pinning one more.
646 : */
647 3374232 : if (likely(!StartReadBuffer(&stream->ios[0].op,
648 : &stream->buffers[oldest_buffer_index],
649 : next_blocknum,
650 : stream->advice_enabled ?
651 : READ_BUFFERS_ISSUE_ADVICE : 0)))
652 : {
653 : /* Fast return. */
654 3352990 : return buffer;
655 : }
656 :
657 : /* Next call must wait for I/O for the newly pinned buffer. */
658 21242 : stream->oldest_io_index = 0;
659 21242 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
660 21242 : stream->ios_in_progress = 1;
661 21242 : stream->ios[0].buffer_index = oldest_buffer_index;
662 21242 : stream->seq_blocknum = next_blocknum + 1;
663 : }
664 : else
665 : {
666 : /* No more blocks, end of stream. */
667 151306 : stream->distance = 0;
668 151306 : stream->oldest_buffer_index = stream->next_buffer_index;
669 151306 : stream->pinned_buffers = 0;
670 : }
671 :
672 172548 : stream->fast_path = false;
673 172548 : return buffer;
674 : }
675 : #endif
676 :
677 5416896 : if (unlikely(stream->pinned_buffers == 0))
678 : {
679 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
680 :
681 : /* End of stream reached? */
682 4152984 : if (stream->distance == 0)
683 2377298 : return InvalidBuffer;
684 :
685 : /*
686 : * The usual order of operations is that we look ahead at the bottom
687 : * of this function after potentially finishing an I/O and making
688 : * space for more, but if we're just starting up we'll need to crank
689 : * the handle to get started.
690 : */
691 1775686 : read_stream_look_ahead(stream, true);
692 :
693 : /* End of stream reached? */
694 1775686 : if (stream->pinned_buffers == 0)
695 : {
696 : Assert(stream->distance == 0);
697 779056 : return InvalidBuffer;
698 : }
699 : }
700 :
701 : /* Grab the oldest pinned buffer and associated per-buffer data. */
702 : Assert(stream->pinned_buffers > 0);
703 2260542 : oldest_buffer_index = stream->oldest_buffer_index;
704 : Assert(oldest_buffer_index >= 0 &&
705 : oldest_buffer_index < stream->queue_size);
706 2260542 : buffer = stream->buffers[oldest_buffer_index];
707 2260542 : if (per_buffer_data)
708 0 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
709 :
710 : Assert(BufferIsValid(buffer));
711 :
712 : /* Do we have to wait for an associated I/O first? */
713 2260542 : if (stream->ios_in_progress > 0 &&
714 941740 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
715 : {
716 917150 : int16 io_index = stream->oldest_io_index;
717 : int16 distance;
718 :
719 : /* Sanity check that we still agree on the buffers. */
720 : Assert(stream->ios[io_index].op.buffers ==
721 : &stream->buffers[oldest_buffer_index]);
722 :
723 917150 : WaitReadBuffers(&stream->ios[io_index].op);
724 :
725 : Assert(stream->ios_in_progress > 0);
726 917150 : stream->ios_in_progress--;
727 917150 : if (++stream->oldest_io_index == stream->max_ios)
728 897184 : stream->oldest_io_index = 0;
729 :
730 917150 : if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
731 : {
732 : /* Distance ramps up fast (behavior C). */
733 552 : distance = stream->distance * 2;
734 552 : distance = Min(distance, stream->max_pinned_buffers);
735 552 : stream->distance = distance;
736 : }
737 : else
738 : {
739 : /* No advice; move towards io_combine_limit (behavior B). */
740 916598 : if (stream->distance > io_combine_limit)
741 : {
742 0 : stream->distance--;
743 : }
744 : else
745 : {
746 916598 : distance = stream->distance * 2;
747 916598 : distance = Min(distance, io_combine_limit);
748 916598 : distance = Min(distance, stream->max_pinned_buffers);
749 916598 : stream->distance = distance;
750 : }
751 : }
752 : }
753 :
754 : #ifdef CLOBBER_FREED_MEMORY
755 : /* Clobber old buffer and per-buffer data for debugging purposes. */
756 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
757 :
758 : /*
759 : * The caller will get access to the per-buffer data, until the next call.
760 : * We wipe the one before, which is never occupied because queue_size
761 : * allowed one extra element. This will hopefully trip up client code
762 : * that is holding a dangling pointer to it.
763 : */
764 : if (stream->per_buffer_data)
765 : wipe_mem(get_per_buffer_data(stream,
766 : oldest_buffer_index == 0 ?
767 : stream->queue_size - 1 :
768 : oldest_buffer_index - 1),
769 : stream->per_buffer_data_size);
770 : #endif
771 :
772 : /* Pin transferred to caller. */
773 : Assert(stream->pinned_buffers > 0);
774 2260542 : stream->pinned_buffers--;
775 :
776 : /* Advance oldest buffer, with wrap-around. */
777 2260542 : stream->oldest_buffer_index++;
778 2260542 : if (stream->oldest_buffer_index == stream->queue_size)
779 456794 : stream->oldest_buffer_index = 0;
780 :
781 : /* Prepare for the next call. */
782 2260542 : read_stream_look_ahead(stream, false);
783 :
784 : #ifndef READ_STREAM_DISABLE_FAST_PATH
785 : /* See if we can take the fast path for all-cached scans next time. */
786 2260542 : if (stream->ios_in_progress == 0 &&
787 1460522 : stream->pinned_buffers == 1 &&
788 440102 : stream->distance == 1 &&
789 331610 : stream->pending_read_nblocks == 0 &&
790 330338 : stream->per_buffer_data_size == 0)
791 : {
792 330338 : stream->fast_path = true;
793 : }
794 : #endif
795 :
796 2260542 : return buffer;
797 : }
798 :
799 : /*
800 : * Transitional support for code that would like to perform or skip reads
801 : * itself, without using the stream. Returns, and consumes, the next block
802 : * number that would be read by the stream's look-ahead algorithm, or
803 : * InvalidBlockNumber if the end of the stream is reached. Also reports the
804 : * strategy that would be used to read it.
805 : */
806 : BlockNumber
807 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
808 : {
809 0 : *strategy = stream->ios[0].op.strategy;
810 0 : return read_stream_get_block(stream, NULL);
811 : }
812 :
813 : /*
814 : * Reset a read stream by releasing any queued up buffers, allowing the stream
815 : * to be used again for different blocks. This can be used to clear an
816 : * end-of-stream condition and start again, or to throw away blocks that were
817 : * speculatively read and read some different blocks instead.
818 : */
819 : void
820 1776030 : read_stream_reset(ReadStream *stream)
821 : {
822 : Buffer buffer;
823 :
824 : /* Stop looking ahead. */
825 1776030 : stream->distance = 0;
826 :
827 : /* Forget buffered block number and fast path state. */
828 1776030 : stream->buffered_blocknum = InvalidBlockNumber;
829 1776030 : stream->fast_path = false;
830 :
831 : /* Unpin anything that wasn't consumed. */
832 1964658 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
833 188628 : ReleaseBuffer(buffer);
834 :
835 : Assert(stream->pinned_buffers == 0);
836 : Assert(stream->ios_in_progress == 0);
837 :
838 : /* Start off assuming data is cached. */
839 1776030 : stream->distance = 1;
840 1776030 : }
841 :
842 : /*
843 : * Release and free a read stream.
844 : */
845 : void
846 772508 : read_stream_end(ReadStream *stream)
847 : {
848 772508 : read_stream_reset(stream);
849 772508 : pfree(stream);
850 772508 : }
|