Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * read_stream.c
4 : * Mechanism for accessing buffered relation data with look-ahead
5 : *
6 : * Code that needs to access relation data typically pins blocks one at a
7 : * time, often in a predictable order that might be sequential or data-driven.
8 : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : * because blocks that are not yet in the buffer pool require I/O operations
10 : * that are small and might stall waiting for storage. This mechanism looks
11 : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : * distance.
14 : *
15 : * A user-provided callback generates a stream of block numbers that is used
16 : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : * pending read. When that isn't possible, the existing pending read is sent
18 : * to StartReadBuffers() so that a new one can begin to form.
19 : *
20 : * The algorithm for controlling the look-ahead distance tries to classify the
21 : * stream into three ideal behaviors:
22 : *
23 : * A) No I/O is necessary, because the requested blocks are fully cached
24 : * already. There is no benefit to looking ahead more than one block, so
25 : * distance is 1. This is the default initial assumption.
26 : *
27 : * B) I/O is necessary, but read-ahead advice is undesirable because the
28 : * access is sequential and we can rely on the kernel's read-ahead heuristics,
29 : * or impossible because direct I/O is enabled, or the system doesn't support
30 : * read-ahead advice. There is no benefit in looking ahead more than
31 : * io_combine_limit, because in this case the only goal is larger read system
32 : * calls. Looking further ahead would pin many buffers and perform
33 : * speculative work for no benefit.
34 : *
35 : * C) I/O is necessary, it appears to be random, and this system supports
36 : * read-ahead advice. We'll look further ahead in order to reach the
37 : * configured level of I/O concurrency.
38 : *
39 : * The distance increases rapidly and decays slowly, so that it moves towards
40 : * those levels as different I/O patterns are discovered. For example, a
41 : * sequential scan of fully cached data doesn't bother looking ahead, but a
42 : * sequential scan that hits a region of uncached blocks will start issuing
43 : * increasingly wide read calls until it plateaus at io_combine_limit.
44 : *
45 : * The main data structure is a circular queue of buffers of size
46 : * max_pinned_buffers plus some extra space for technical reasons, ready to be
47 : * returned by read_stream_next_buffer(). Each buffer also has an optional
48 : * variable sized object that is passed from the callback to the consumer of
49 : * buffers.
50 : *
51 : * Parallel to the queue of buffers, there is a circular queue of in-progress
52 : * I/Os that have been started with StartReadBuffers(), and for which
53 : * WaitReadBuffers() must be called before returning the buffer.
54 : *
55 : * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
56 : * successive calls, then these data structures might appear as follows:
57 : *
58 : * buffers buf/data ios
59 : *
60 : * +----+ +-----+ +--------+
61 : * | | | | +----+ 42..44 | <- oldest_io_index
62 : * +----+ +-----+ | +--------+
63 : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
64 : * +----+ +-----+ | | +--------+
65 : * | 42 | | ? |<-+ | | | <- next_io_index
66 : * +----+ +-----+ | +--------+
67 : * | 43 | | ? | | | |
68 : * +----+ +-----+ | +--------+
69 : * | 44 | | ? | | | |
70 : * +----+ +-----+ | +--------+
71 : * | 60 | | ? |<---+
72 : * +----+ +-----+
73 : * next_buffer_index -> | | | |
74 : * +----+ +-----+
75 : *
76 : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
77 : * the client is block 10. Block 10 was a hit and has no associated I/O, but
78 : * the range 42..44 requires an I/O wait before its buffers are returned, as
79 : * does block 60.
80 : *
81 : *
82 : * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
83 : * Portions Copyright (c) 1994, Regents of the University of California
84 : *
85 : * IDENTIFICATION
86 : * src/backend/storage/aio/read_stream.c
87 : *
88 : *-------------------------------------------------------------------------
89 : */
90 : #include "postgres.h"
91 :
92 : #include "miscadmin.h"
93 : #include "storage/fd.h"
94 : #include "storage/smgr.h"
95 : #include "storage/read_stream.h"
96 : #include "utils/memdebug.h"
97 : #include "utils/rel.h"
98 : #include "utils/spccache.h"
99 :
100 : typedef struct InProgressIO
101 : {
102 : int16 buffer_index;
103 : ReadBuffersOperation op;
104 : } InProgressIO;
105 :
106 : /*
107 : * State for managing a stream of reads.
108 : */
109 : struct ReadStream
110 : {
111 : int16 max_ios;
112 : int16 ios_in_progress;
113 : int16 queue_size;
114 : int16 max_pinned_buffers;
115 : int16 pinned_buffers;
116 : int16 distance;
117 : bool advice_enabled;
118 :
119 : /*
120 : * One-block buffer to support 'ungetting' a block number, to resolve flow
121 : * control problems when I/Os are split.
122 : */
123 : BlockNumber buffered_blocknum;
124 :
125 : /*
126 : * The callback that will tell us which block numbers to read, and an
127 : * opaque pointer that will be pass to it for its own purposes.
128 : */
129 : ReadStreamBlockNumberCB callback;
130 : void *callback_private_data;
131 :
132 : /* Next expected block, for detecting sequential access. */
133 : BlockNumber seq_blocknum;
134 :
135 : /* The read operation we are currently preparing. */
136 : BlockNumber pending_read_blocknum;
137 : int16 pending_read_nblocks;
138 :
139 : /* Space for buffers and optional per-buffer private data. */
140 : size_t per_buffer_data_size;
141 : void *per_buffer_data;
142 :
143 : /* Read operations that have been started but not waited for yet. */
144 : InProgressIO *ios;
145 : int16 oldest_io_index;
146 : int16 next_io_index;
147 :
148 : bool fast_path;
149 :
150 : /* Circular queue of buffers. */
151 : int16 oldest_buffer_index; /* Next pinned buffer to return */
152 : int16 next_buffer_index; /* Index of next buffer to pin */
153 : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
154 : };
155 :
156 : /*
157 : * Return a pointer to the per-buffer data by index.
158 : */
159 : static inline void *
160 5727674 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
161 : {
162 11455348 : return (char *) stream->per_buffer_data +
163 5727674 : stream->per_buffer_data_size * buffer_index;
164 : }
165 :
166 : /*
167 : * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
168 : * blocks [current_blocknum, last_exclusive).
169 : */
170 : BlockNumber
171 541722 : block_range_read_stream_cb(ReadStream *stream,
172 : void *callback_private_data,
173 : void *per_buffer_data)
174 : {
175 541722 : BlockRangeReadStreamPrivate *p = callback_private_data;
176 :
177 541722 : if (p->current_blocknum < p->last_exclusive)
178 429276 : return p->current_blocknum++;
179 :
180 112446 : return InvalidBlockNumber;
181 : }
182 :
183 : /*
184 : * Ask the callback which block it would like us to read next, with a one block
185 : * buffer in front to allow read_stream_unget_block() to work.
186 : */
187 : static inline BlockNumber
188 8581236 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
189 : {
190 : BlockNumber blocknum;
191 :
192 8581236 : blocknum = stream->buffered_blocknum;
193 8581236 : if (blocknum != InvalidBlockNumber)
194 4 : stream->buffered_blocknum = InvalidBlockNumber;
195 : else
196 : {
197 : /*
198 : * Tell Valgrind that the per-buffer data is undefined. That replaces
199 : * the "noaccess" state that was set when the consumer moved past this
200 : * entry last time around the queue, and should also catch callbacks
201 : * that fail to initialize data that the buffer consumer later
202 : * accesses. On the first go around, it is undefined already.
203 : */
204 : VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
205 : stream->per_buffer_data_size);
206 8581232 : blocknum = stream->callback(stream,
207 : stream->callback_private_data,
208 : per_buffer_data);
209 : }
210 :
211 8581236 : return blocknum;
212 : }
213 :
214 : /*
215 : * In order to deal with short reads in StartReadBuffers(), we sometimes need
216 : * to defer handling of a block until later.
217 : */
218 : static inline void
219 4 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
220 : {
221 : /* We shouldn't ever unget more than one block. */
222 : Assert(stream->buffered_blocknum == InvalidBlockNumber);
223 : Assert(blocknum != InvalidBlockNumber);
224 4 : stream->buffered_blocknum = blocknum;
225 4 : }
226 :
227 : static void
228 3045704 : read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
229 : {
230 : bool need_wait;
231 : int nblocks;
232 : int flags;
233 : int16 io_index;
234 : int16 overflow;
235 : int16 buffer_index;
236 :
237 : /* This should only be called with a pending read. */
238 : Assert(stream->pending_read_nblocks > 0);
239 : Assert(stream->pending_read_nblocks <= io_combine_limit);
240 :
241 : /* We had better not exceed the pin limit by starting this read. */
242 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
243 : stream->max_pinned_buffers);
244 :
245 : /* We had better not be overwriting an existing pinned buffer. */
246 3045704 : if (stream->pinned_buffers > 0)
247 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
248 : else
249 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
250 :
251 : /*
252 : * If advice hasn't been suppressed, this system supports it, and this
253 : * isn't a strictly sequential pattern, then we'll issue advice.
254 : */
255 3045704 : if (!suppress_advice &&
256 1944358 : stream->advice_enabled &&
257 872834 : stream->pending_read_blocknum != stream->seq_blocknum)
258 777206 : flags = READ_BUFFERS_ISSUE_ADVICE;
259 : else
260 2268498 : flags = 0;
261 :
262 : /* We say how many blocks we want to read, but may be smaller on return. */
263 3045704 : buffer_index = stream->next_buffer_index;
264 3045704 : io_index = stream->next_io_index;
265 3045704 : nblocks = stream->pending_read_nblocks;
266 3045704 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
267 3045704 : &stream->buffers[buffer_index],
268 : stream->pending_read_blocknum,
269 : &nblocks,
270 : flags);
271 3045704 : stream->pinned_buffers += nblocks;
272 :
273 : /* Remember whether we need to wait before returning this buffer. */
274 3045704 : if (!need_wait)
275 : {
276 : /* Look-ahead distance decays, no I/O necessary (behavior A). */
277 2095370 : if (stream->distance > 1)
278 17774 : stream->distance--;
279 : }
280 : else
281 : {
282 : /*
283 : * Remember to call WaitReadBuffers() before returning head buffer.
284 : * Look-ahead distance will be adjusted after waiting.
285 : */
286 950334 : stream->ios[io_index].buffer_index = buffer_index;
287 950334 : if (++stream->next_io_index == stream->max_ios)
288 917776 : stream->next_io_index = 0;
289 : Assert(stream->ios_in_progress < stream->max_ios);
290 950334 : stream->ios_in_progress++;
291 950334 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
292 : }
293 :
294 : /*
295 : * We gave a contiguous range of buffer space to StartReadBuffers(), but
296 : * we want it to wrap around at queue_size. Slide overflowing buffers to
297 : * the front of the array.
298 : */
299 3045704 : overflow = (buffer_index + nblocks) - stream->queue_size;
300 3045704 : if (overflow > 0)
301 8850 : memmove(&stream->buffers[0],
302 8850 : &stream->buffers[stream->queue_size],
303 : sizeof(stream->buffers[0]) * overflow);
304 :
305 : /* Compute location of start of next read, without using % operator. */
306 3045704 : buffer_index += nblocks;
307 3045704 : if (buffer_index >= stream->queue_size)
308 515284 : buffer_index -= stream->queue_size;
309 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
310 3045704 : stream->next_buffer_index = buffer_index;
311 :
312 : /* Adjust the pending read to cover the remaining portion, if any. */
313 3045704 : stream->pending_read_blocknum += nblocks;
314 3045704 : stream->pending_read_nblocks -= nblocks;
315 3045704 : }
316 :
317 : static void
318 5122280 : read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
319 : {
320 8433178 : while (stream->ios_in_progress < stream->max_ios &&
321 8406176 : stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
322 : {
323 : BlockNumber blocknum;
324 : int16 buffer_index;
325 : void *per_buffer_data;
326 :
327 4946770 : if (stream->pending_read_nblocks == io_combine_limit)
328 : {
329 0 : read_stream_start_pending_read(stream, suppress_advice);
330 0 : suppress_advice = false;
331 0 : continue;
332 : }
333 :
334 : /*
335 : * See which block the callback wants next in the stream. We need to
336 : * compute the index of the Nth block of the pending read including
337 : * wrap-around, but we don't want to use the expensive % operator.
338 : */
339 4946770 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
340 4946770 : if (buffer_index >= stream->queue_size)
341 102852 : buffer_index -= stream->queue_size;
342 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
343 4946770 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
344 4946770 : blocknum = read_stream_get_block(stream, per_buffer_data);
345 4946770 : if (blocknum == InvalidBlockNumber)
346 : {
347 : /* End of stream. */
348 1635868 : stream->distance = 0;
349 1635868 : break;
350 : }
351 :
352 : /* Can we merge it with the pending read? */
353 3310902 : if (stream->pending_read_nblocks > 0 &&
354 332112 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
355 : {
356 332106 : stream->pending_read_nblocks++;
357 332106 : continue;
358 : }
359 :
360 : /* We have to start the pending read before we can build another. */
361 2978798 : while (stream->pending_read_nblocks > 0)
362 : {
363 6 : read_stream_start_pending_read(stream, suppress_advice);
364 6 : suppress_advice = false;
365 6 : if (stream->ios_in_progress == stream->max_ios)
366 : {
367 : /* And we've hit the limit. Rewind, and stop here. */
368 4 : read_stream_unget_block(stream, blocknum);
369 4 : return;
370 : }
371 : }
372 :
373 : /* This is the start of a new pending read. */
374 2978792 : stream->pending_read_blocknum = blocknum;
375 2978792 : stream->pending_read_nblocks = 1;
376 : }
377 :
378 : /*
379 : * We don't start the pending read just because we've hit the distance
380 : * limit, preferring to give it another chance to grow to full
381 : * io_combine_limit size once more buffers have been consumed. However,
382 : * if we've already reached io_combine_limit, or we've reached the
383 : * distance limit and there isn't anything pinned yet, or the callback has
384 : * signaled end-of-stream, we start the read immediately.
385 : */
386 5122276 : if (stream->pending_read_nblocks > 0 &&
387 3169592 : (stream->pending_read_nblocks == io_combine_limit ||
388 3159098 : (stream->pending_read_nblocks == stream->distance &&
389 2892030 : stream->pinned_buffers == 0) ||
390 267068 : stream->distance == 0) &&
391 3045896 : stream->ios_in_progress < stream->max_ios)
392 3045698 : read_stream_start_pending_read(stream, suppress_advice);
393 : }
394 :
395 : /*
396 : * Create a new read stream object that can be used to perform the equivalent
397 : * of a series of ReadBuffer() calls for one fork of one relation.
398 : * Internally, it generates larger vectored reads where possible by looking
399 : * ahead. The callback should return block numbers or InvalidBlockNumber to
400 : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
401 : * write extra data for each block into the space provided to it. It will
402 : * also receive callback_private_data for its own purposes.
403 : */
404 : static ReadStream *
405 915940 : read_stream_begin_impl(int flags,
406 : BufferAccessStrategy strategy,
407 : Relation rel,
408 : SMgrRelation smgr,
409 : char persistence,
410 : ForkNumber forknum,
411 : ReadStreamBlockNumberCB callback,
412 : void *callback_private_data,
413 : size_t per_buffer_data_size)
414 : {
415 : ReadStream *stream;
416 : size_t size;
417 : int16 queue_size;
418 : int max_ios;
419 : int strategy_pin_limit;
420 : uint32 max_pinned_buffers;
421 : Oid tablespace_id;
422 :
423 : /*
424 : * Decide how many I/Os we will allow to run at the same time. That
425 : * currently means advice to the kernel to tell it that we will soon read.
426 : * This number also affects how far we look ahead for opportunities to
427 : * start more I/Os.
428 : */
429 915940 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
430 915940 : if (!OidIsValid(MyDatabaseId) ||
431 1055706 : (rel && IsCatalogRelation(rel)) ||
432 263872 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
433 : {
434 : /*
435 : * Avoid circularity while trying to look up tablespace settings or
436 : * before spccache.c is ready.
437 : */
438 753022 : max_ios = effective_io_concurrency;
439 : }
440 162918 : else if (flags & READ_STREAM_MAINTENANCE)
441 19500 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
442 : else
443 143418 : max_ios = get_tablespace_io_concurrency(tablespace_id);
444 :
445 : /* Cap to INT16_MAX to avoid overflowing below */
446 915940 : max_ios = Min(max_ios, PG_INT16_MAX);
447 :
448 : /*
449 : * Choose the maximum number of buffers we're prepared to pin. We try to
450 : * pin fewer if we can, though. We clamp it to at least io_combine_limit
451 : * so that we can have a chance to build up a full io_combine_limit sized
452 : * read, even when max_ios is zero. Be careful not to allow int16 to
453 : * overflow (even though that's not possible with the current GUC range
454 : * limits), allowing also for the spare entry and the overflow space.
455 : */
456 915940 : max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
457 915940 : max_pinned_buffers = Min(max_pinned_buffers,
458 : PG_INT16_MAX - io_combine_limit - 1);
459 :
460 : /* Give the strategy a chance to limit the number of buffers we pin. */
461 915940 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
462 915940 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
463 :
464 : /* Don't allow this backend to pin more than its share of buffers. */
465 915940 : if (SmgrIsTemp(smgr))
466 11128 : LimitAdditionalLocalPins(&max_pinned_buffers);
467 : else
468 904812 : LimitAdditionalPins(&max_pinned_buffers);
469 : Assert(max_pinned_buffers > 0);
470 :
471 : /*
472 : * We need one extra entry for buffers and per-buffer data, because users
473 : * of per-buffer data have access to the object until the next call to
474 : * read_stream_next_buffer(), so we need a gap between the head and tail
475 : * of the queue so that we don't clobber it.
476 : */
477 915940 : queue_size = max_pinned_buffers + 1;
478 :
479 : /*
480 : * Allocate the object, the buffers, the ios and per_buffer_data space in
481 : * one big chunk. Though we have queue_size buffers, we want to be able
482 : * to assume that all the buffers for a single read are contiguous (i.e.
483 : * don't wrap around halfway through), so we allow temporary overflows of
484 : * up to the maximum possible read size by allocating an extra
485 : * io_combine_limit - 1 elements.
486 : */
487 915940 : size = offsetof(ReadStream, buffers);
488 915940 : size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
489 915940 : size += sizeof(InProgressIO) * Max(1, max_ios);
490 915940 : size += per_buffer_data_size * queue_size;
491 915940 : size += MAXIMUM_ALIGNOF * 2;
492 915940 : stream = (ReadStream *) palloc(size);
493 915940 : memset(stream, 0, offsetof(ReadStream, buffers));
494 915940 : stream->ios = (InProgressIO *)
495 915940 : MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
496 915940 : if (per_buffer_data_size > 0)
497 118650 : stream->per_buffer_data = (void *)
498 118650 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
499 :
500 : #ifdef USE_PREFETCH
501 :
502 : /*
503 : * This system supports prefetching advice. We can use it as long as
504 : * direct I/O isn't enabled, the caller hasn't promised sequential access
505 : * (overriding our detection heuristics), and max_ios hasn't been set to
506 : * zero.
507 : */
508 915940 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
509 915740 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
510 : max_ios > 0)
511 247356 : stream->advice_enabled = true;
512 : #endif
513 :
514 : /*
515 : * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
516 : * above. If we had real asynchronous I/O we might need a slightly
517 : * different definition.
518 : */
519 915940 : if (max_ios == 0)
520 0 : max_ios = 1;
521 :
522 915940 : stream->max_ios = max_ios;
523 915940 : stream->per_buffer_data_size = per_buffer_data_size;
524 915940 : stream->max_pinned_buffers = max_pinned_buffers;
525 915940 : stream->queue_size = queue_size;
526 915940 : stream->callback = callback;
527 915940 : stream->callback_private_data = callback_private_data;
528 915940 : stream->buffered_blocknum = InvalidBlockNumber;
529 :
530 : /*
531 : * Skip the initial ramp-up phase if the caller says we're going to be
532 : * reading the whole relation. This way we start out assuming we'll be
533 : * doing full io_combine_limit sized reads (behavior B).
534 : */
535 915940 : if (flags & READ_STREAM_FULL)
536 113808 : stream->distance = Min(max_pinned_buffers, io_combine_limit);
537 : else
538 802132 : stream->distance = 1;
539 :
540 : /*
541 : * Since we always access the same relation, we can initialize parts of
542 : * the ReadBuffersOperation objects and leave them that way, to avoid
543 : * wasting CPU cycles writing to them for each read.
544 : */
545 2056184 : for (int i = 0; i < max_ios; ++i)
546 : {
547 1140244 : stream->ios[i].op.rel = rel;
548 1140244 : stream->ios[i].op.smgr = smgr;
549 1140244 : stream->ios[i].op.persistence = persistence;
550 1140244 : stream->ios[i].op.forknum = forknum;
551 1140244 : stream->ios[i].op.strategy = strategy;
552 : }
553 :
554 915940 : return stream;
555 : }
556 :
557 : /*
558 : * Create a new read stream for reading a relation.
559 : * See read_stream_begin_impl() for the detailed explanation.
560 : */
561 : ReadStream *
562 807662 : read_stream_begin_relation(int flags,
563 : BufferAccessStrategy strategy,
564 : Relation rel,
565 : ForkNumber forknum,
566 : ReadStreamBlockNumberCB callback,
567 : void *callback_private_data,
568 : size_t per_buffer_data_size)
569 : {
570 807662 : return read_stream_begin_impl(flags,
571 : strategy,
572 : rel,
573 : RelationGetSmgr(rel),
574 807662 : rel->rd_rel->relpersistence,
575 : forknum,
576 : callback,
577 : callback_private_data,
578 : per_buffer_data_size);
579 : }
580 :
581 : /*
582 : * Create a new read stream for reading a SMgr relation.
583 : * See read_stream_begin_impl() for the detailed explanation.
584 : */
585 : ReadStream *
586 108278 : read_stream_begin_smgr_relation(int flags,
587 : BufferAccessStrategy strategy,
588 : SMgrRelation smgr,
589 : char smgr_persistence,
590 : ForkNumber forknum,
591 : ReadStreamBlockNumberCB callback,
592 : void *callback_private_data,
593 : size_t per_buffer_data_size)
594 : {
595 108278 : return read_stream_begin_impl(flags,
596 : strategy,
597 : NULL,
598 : smgr,
599 : smgr_persistence,
600 : forknum,
601 : callback,
602 : callback_private_data,
603 : per_buffer_data_size);
604 : }
605 :
606 : /*
607 : * Pull one pinned buffer out of a stream. Each call returns successive
608 : * blocks in the order specified by the callback. If per_buffer_data_size was
609 : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
610 : * per-buffer data that the callback had a chance to populate, which remains
611 : * valid until the next call to read_stream_next_buffer(). When the stream
612 : * runs out of data, InvalidBuffer is returned. The caller may decide to end
613 : * the stream early at any time by calling read_stream_end().
614 : */
615 : Buffer
616 10309992 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
617 : {
618 : Buffer buffer;
619 : int16 oldest_buffer_index;
620 :
621 : #ifndef READ_STREAM_DISABLE_FAST_PATH
622 :
623 : /*
624 : * A fast path for all-cached scans (behavior A). This is the same as the
625 : * usual algorithm, but it is specialized for no I/O and no per-buffer
626 : * data, so we can skip the queue management code, stay in the same buffer
627 : * slot and use singular StartReadBuffer().
628 : */
629 10309992 : if (likely(stream->fast_path))
630 : {
631 : BlockNumber next_blocknum;
632 :
633 : /* Fast path assumptions. */
634 : Assert(stream->ios_in_progress == 0);
635 : Assert(stream->pinned_buffers == 1);
636 : Assert(stream->distance == 1);
637 : Assert(stream->pending_read_nblocks == 0);
638 : Assert(stream->per_buffer_data_size == 0);
639 :
640 : /* We're going to return the buffer we pinned last time. */
641 3634466 : oldest_buffer_index = stream->oldest_buffer_index;
642 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
643 : stream->next_buffer_index);
644 3634466 : buffer = stream->buffers[oldest_buffer_index];
645 : Assert(buffer != InvalidBuffer);
646 :
647 : /* Choose the next block to pin. */
648 3634466 : next_blocknum = read_stream_get_block(stream, NULL);
649 :
650 3634466 : if (likely(next_blocknum != InvalidBlockNumber))
651 : {
652 : /*
653 : * Pin a buffer for the next call. Same buffer entry, and
654 : * arbitrary I/O entry (they're all free). We don't have to
655 : * adjust pinned_buffers because we're transferring one to caller
656 : * but pinning one more.
657 : */
658 3477854 : if (likely(!StartReadBuffer(&stream->ios[0].op,
659 : &stream->buffers[oldest_buffer_index],
660 : next_blocknum,
661 : stream->advice_enabled ?
662 : READ_BUFFERS_ISSUE_ADVICE : 0)))
663 : {
664 : /* Fast return. */
665 3455728 : return buffer;
666 : }
667 :
668 : /* Next call must wait for I/O for the newly pinned buffer. */
669 22126 : stream->oldest_io_index = 0;
670 22126 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
671 22126 : stream->ios_in_progress = 1;
672 22126 : stream->ios[0].buffer_index = oldest_buffer_index;
673 22126 : stream->seq_blocknum = next_blocknum + 1;
674 : }
675 : else
676 : {
677 : /* No more blocks, end of stream. */
678 156612 : stream->distance = 0;
679 156612 : stream->oldest_buffer_index = stream->next_buffer_index;
680 156612 : stream->pinned_buffers = 0;
681 : }
682 :
683 178738 : stream->fast_path = false;
684 178738 : return buffer;
685 : }
686 : #endif
687 :
688 6675526 : if (unlikely(stream->pinned_buffers == 0))
689 : {
690 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
691 :
692 : /* End of stream reached? */
693 4622608 : if (stream->distance == 0)
694 2654592 : return InvalidBuffer;
695 :
696 : /*
697 : * The usual order of operations is that we look ahead at the bottom
698 : * of this function after potentially finishing an I/O and making
699 : * space for more, but if we're just starting up we'll need to crank
700 : * the handle to get started.
701 : */
702 1968016 : read_stream_look_ahead(stream, true);
703 :
704 : /* End of stream reached? */
705 1968016 : if (stream->pinned_buffers == 0)
706 : {
707 : Assert(stream->distance == 0);
708 866670 : return InvalidBuffer;
709 : }
710 : }
711 :
712 : /* Grab the oldest pinned buffer and associated per-buffer data. */
713 : Assert(stream->pinned_buffers > 0);
714 3154264 : oldest_buffer_index = stream->oldest_buffer_index;
715 : Assert(oldest_buffer_index >= 0 &&
716 : oldest_buffer_index < stream->queue_size);
717 3154264 : buffer = stream->buffers[oldest_buffer_index];
718 3154264 : if (per_buffer_data)
719 780904 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
720 :
721 : Assert(BufferIsValid(buffer));
722 :
723 : /* Do we have to wait for an associated I/O first? */
724 3154264 : if (stream->ios_in_progress > 0 &&
725 999630 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
726 : {
727 972458 : int16 io_index = stream->oldest_io_index;
728 : int16 distance;
729 :
730 : /* Sanity check that we still agree on the buffers. */
731 : Assert(stream->ios[io_index].op.buffers ==
732 : &stream->buffers[oldest_buffer_index]);
733 :
734 972458 : WaitReadBuffers(&stream->ios[io_index].op);
735 :
736 : Assert(stream->ios_in_progress > 0);
737 972458 : stream->ios_in_progress--;
738 972458 : if (++stream->oldest_io_index == stream->max_ios)
739 939880 : stream->oldest_io_index = 0;
740 :
741 972458 : if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
742 : {
743 : /* Distance ramps up fast (behavior C). */
744 1648 : distance = stream->distance * 2;
745 1648 : distance = Min(distance, stream->max_pinned_buffers);
746 1648 : stream->distance = distance;
747 : }
748 : else
749 : {
750 : /* No advice; move towards io_combine_limit (behavior B). */
751 970810 : if (stream->distance > io_combine_limit)
752 : {
753 0 : stream->distance--;
754 : }
755 : else
756 : {
757 970810 : distance = stream->distance * 2;
758 970810 : distance = Min(distance, io_combine_limit);
759 970810 : distance = Min(distance, stream->max_pinned_buffers);
760 970810 : stream->distance = distance;
761 : }
762 : }
763 : }
764 :
765 : #ifdef CLOBBER_FREED_MEMORY
766 : /* Clobber old buffer for debugging purposes. */
767 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
768 : #endif
769 :
770 : #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
771 :
772 : /*
773 : * The caller will get access to the per-buffer data, until the next call.
774 : * We wipe the one before, which is never occupied because queue_size
775 : * allowed one extra element. This will hopefully trip up client code
776 : * that is holding a dangling pointer to it.
777 : */
778 : if (stream->per_buffer_data)
779 : {
780 : void *per_buffer_data;
781 :
782 : per_buffer_data = get_per_buffer_data(stream,
783 : oldest_buffer_index == 0 ?
784 : stream->queue_size - 1 :
785 : oldest_buffer_index - 1);
786 :
787 : #if defined(CLOBBER_FREED_MEMORY)
788 : /* This also tells Valgrind the memory is "noaccess". */
789 : wipe_mem(per_buffer_data, stream->per_buffer_data_size);
790 : #elif defined(USE_VALGRIND)
791 : /* Tell it ourselves. */
792 : VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
793 : stream->per_buffer_data_size);
794 : #endif
795 : }
796 : #endif
797 :
798 : /* Pin transferred to caller. */
799 : Assert(stream->pinned_buffers > 0);
800 3154264 : stream->pinned_buffers--;
801 :
802 : /* Advance oldest buffer, with wrap-around. */
803 3154264 : stream->oldest_buffer_index++;
804 3154264 : if (stream->oldest_buffer_index == stream->queue_size)
805 501828 : stream->oldest_buffer_index = 0;
806 :
807 : /* Prepare for the next call. */
808 3154264 : read_stream_look_ahead(stream, false);
809 :
810 : #ifndef READ_STREAM_DISABLE_FAST_PATH
811 : /* See if we can take the fast path for all-cached scans next time. */
812 3154264 : if (stream->ios_in_progress == 0 &&
813 2301934 : stream->pinned_buffers == 1 &&
814 1166652 : stream->distance == 1 &&
815 1056272 : stream->pending_read_nblocks == 0 &&
816 1054658 : stream->per_buffer_data_size == 0)
817 : {
818 343584 : stream->fast_path = true;
819 : }
820 : #endif
821 :
822 3154264 : return buffer;
823 : }
824 :
825 : /*
826 : * Transitional support for code that would like to perform or skip reads
827 : * itself, without using the stream. Returns, and consumes, the next block
828 : * number that would be read by the stream's look-ahead algorithm, or
829 : * InvalidBlockNumber if the end of the stream is reached. Also reports the
830 : * strategy that would be used to read it.
831 : */
832 : BlockNumber
833 0 : read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
834 : {
835 0 : *strategy = stream->ios[0].op.strategy;
836 0 : return read_stream_get_block(stream, NULL);
837 : }
838 :
839 : /*
840 : * Reset a read stream by releasing any queued up buffers, allowing the stream
841 : * to be used again for different blocks. This can be used to clear an
842 : * end-of-stream condition and start again, or to throw away blocks that were
843 : * speculatively read and read some different blocks instead.
844 : */
845 : void
846 1968320 : read_stream_reset(ReadStream *stream)
847 : {
848 : Buffer buffer;
849 :
850 : /* Stop looking ahead. */
851 1968320 : stream->distance = 0;
852 :
853 : /* Forget buffered block number and fast path state. */
854 1968320 : stream->buffered_blocknum = InvalidBlockNumber;
855 1968320 : stream->fast_path = false;
856 :
857 : /* Unpin anything that wasn't consumed. */
858 2165064 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
859 196744 : ReleaseBuffer(buffer);
860 :
861 : Assert(stream->pinned_buffers == 0);
862 : Assert(stream->ios_in_progress == 0);
863 :
864 : /* Start off assuming data is cached. */
865 1968320 : stream->distance = 1;
866 1968320 : }
867 :
868 : /*
869 : * Release and free a read stream.
870 : */
871 : void
872 913376 : read_stream_end(ReadStream *stream)
873 : {
874 913376 : read_stream_reset(stream);
875 913376 : pfree(stream);
876 913376 : }
|