Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * read_stream.c
4 : * Mechanism for accessing buffered relation data with look-ahead
5 : *
6 : * Code that needs to access relation data typically pins blocks one at a
7 : * time, often in a predictable order that might be sequential or data-driven.
8 : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : * because blocks that are not yet in the buffer pool require I/O operations
10 : * that are small and might stall waiting for storage. This mechanism looks
11 : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : * distance.
14 : *
15 : * A user-provided callback generates a stream of block numbers that is used
16 : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : * pending read. When that isn't possible, the existing pending read is sent
18 : * to StartReadBuffers() so that a new one can begin to form.
19 : *
20 : * The algorithm for controlling the look-ahead distance tries to classify the
21 : * stream into three ideal behaviors:
22 : *
23 : * A) No I/O is necessary, because the requested blocks are fully cached
24 : * already. There is no benefit to looking ahead more than one block, so
25 : * distance is 1. This is the default initial assumption.
26 : *
27 : * B) I/O is necessary, but fadvise is undesirable because the access is
28 : * sequential, or impossible because direct I/O is enabled or the system
29 : * doesn't support fadvise. There is no benefit in looking ahead more than
30 : * io_combine_limit, because in this case the only goal is larger read system
31 : * calls. Looking further ahead would pin many buffers and perform
32 : * speculative work looking ahead for no benefit.
33 : *
34 : * C) I/O is necessary, it appears random, and this system supports fadvise.
35 : * We'll look further ahead in order to reach the configured level of I/O
36 : * concurrency.
37 : *
38 : * The distance increases rapidly and decays slowly, so that it moves towards
39 : * those levels as different I/O patterns are discovered. For example, a
40 : * sequential scan of fully cached data doesn't bother looking ahead, but a
41 : * sequential scan that hits a region of uncached blocks will start issuing
42 : * increasingly wide read calls until it plateaus at io_combine_limit.
43 : *
44 : * The main data structure is a circular queue of buffers of size
45 : * max_pinned_buffers plus some extra space for technical reasons, ready to be
46 : * returned by read_stream_next_buffer(). Each buffer also has an optional
47 : * variable sized object that is passed from the callback to the consumer of
48 : * buffers.
49 : *
50 : * Parallel to the queue of buffers, there is a circular queue of in-progress
51 : * I/Os that have been started with StartReadBuffers(), and for which
52 : * WaitReadBuffers() must be called before returning the buffer.
53 : *
54 : * For example, if the callback return block numbers 10, 42, 43, 60 in
55 : * successive calls, then these data structures might appear as follows:
56 : *
57 : * buffers buf/data ios
58 : *
59 : * +----+ +-----+ +--------+
60 : * | | | | +----+ 42..44 | <- oldest_io_index
61 : * +----+ +-----+ | +--------+
62 : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
63 : * +----+ +-----+ | | +--------+
64 : * | 42 | | ? |<-+ | | | <- next_io_index
65 : * +----+ +-----+ | +--------+
66 : * | 43 | | ? | | | |
67 : * +----+ +-----+ | +--------+
68 : * | 44 | | ? | | | |
69 : * +----+ +-----+ | +--------+
70 : * | 60 | | ? |<---+
71 : * +----+ +-----+
72 : * next_buffer_index -> | | | |
73 : * +----+ +-----+
74 : *
75 : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
76 : * the client is block 10. Block 10 was a hit and has no associated I/O, but
77 : * the range 42..44 requires an I/O wait before its buffers are returned, as
78 : * does block 60.
79 : *
80 : *
81 : * Portions Copyright (c) 2024, PostgreSQL Global Development Group
82 : * Portions Copyright (c) 1994, Regents of the University of California
83 : *
84 : * IDENTIFICATION
85 : * src/backend/storage/aio/read_stream.c
86 : *
87 : *-------------------------------------------------------------------------
88 : */
89 : #include "postgres.h"
90 :
91 : #include "catalog/pg_tablespace.h"
92 : #include "miscadmin.h"
93 : #include "storage/fd.h"
94 : #include "storage/smgr.h"
95 : #include "storage/read_stream.h"
96 : #include "utils/memdebug.h"
97 : #include "utils/rel.h"
98 : #include "utils/spccache.h"
99 :
100 : typedef struct InProgressIO
101 : {
102 : int16 buffer_index;
103 : ReadBuffersOperation op;
104 : } InProgressIO;
105 :
106 : /*
107 : * State for managing a stream of reads.
108 : */
109 : struct ReadStream
110 : {
111 : int16 max_ios;
112 : int16 ios_in_progress;
113 : int16 queue_size;
114 : int16 max_pinned_buffers;
115 : int16 pinned_buffers;
116 : int16 distance;
117 : bool advice_enabled;
118 :
119 : /*
120 : * Small buffer of block numbers, useful for 'ungetting' to resolve flow
121 : * control problems when I/Os are split. Also useful for batch-loading
122 : * block numbers in the fast path.
123 : */
124 : BlockNumber blocknums[16];
125 : int16 blocknums_count;
126 : int16 blocknums_next;
127 :
128 : /*
129 : * The callback that will tell us which block numbers to read, and an
130 : * opaque pointer that will be pass to it for its own purposes.
131 : */
132 : ReadStreamBlockNumberCB callback;
133 : void *callback_private_data;
134 :
135 : /* Next expected block, for detecting sequential access. */
136 : BlockNumber seq_blocknum;
137 :
138 : /* The read operation we are currently preparing. */
139 : BlockNumber pending_read_blocknum;
140 : int16 pending_read_nblocks;
141 :
142 : /* Space for buffers and optional per-buffer private data. */
143 : size_t per_buffer_data_size;
144 : void *per_buffer_data;
145 :
146 : /* Read operations that have been started but not waited for yet. */
147 : InProgressIO *ios;
148 : int16 oldest_io_index;
149 : int16 next_io_index;
150 :
151 : bool fast_path;
152 :
153 : /* Circular queue of buffers. */
154 : int16 oldest_buffer_index; /* Next pinned buffer to return */
155 : int16 next_buffer_index; /* Index of next buffer to pin */
156 : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
157 : };
158 :
159 : /*
160 : * Return a pointer to the per-buffer data by index.
161 : */
162 : static inline void *
163 2920852 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
164 : {
165 5841704 : return (char *) stream->per_buffer_data +
166 2920852 : stream->per_buffer_data_size * buffer_index;
167 : }
168 :
169 : /*
170 : * Ask the callback which block it would like us to read next, with a small
171 : * buffer in front to allow read_stream_unget_block() to work and to allow the
172 : * fast path to skip this function and work directly from the array.
173 : */
174 : static inline BlockNumber
175 2920852 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
176 : {
177 2920852 : if (stream->blocknums_next < stream->blocknums_count)
178 53616 : return stream->blocknums[stream->blocknums_next++];
179 :
180 : /*
181 : * We only bother to fetch one at a time here (but see the fast path which
182 : * uses more).
183 : */
184 2867236 : return stream->callback(stream,
185 : stream->callback_private_data,
186 : per_buffer_data);
187 : }
188 :
189 : /*
190 : * In order to deal with short reads in StartReadBuffers(), we sometimes need
191 : * to defer handling of a block until later.
192 : */
193 : static inline void
194 0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
195 : {
196 0 : if (stream->blocknums_next == stream->blocknums_count)
197 : {
198 : /* Never initialized or entirely consumed. Re-initialize. */
199 0 : stream->blocknums[0] = blocknum;
200 0 : stream->blocknums_count = 1;
201 0 : stream->blocknums_next = 0;
202 : }
203 : else
204 : {
205 : /* Must be the last value return from blocknums array. */
206 : Assert(stream->blocknums_next > 0);
207 0 : stream->blocknums_next--;
208 : Assert(stream->blocknums[stream->blocknums_next] == blocknum);
209 : }
210 0 : }
211 :
212 : #ifndef READ_STREAM_DISABLE_FAST_PATH
213 : static void
214 333638 : read_stream_fill_blocknums(ReadStream *stream)
215 : {
216 : BlockNumber blocknum;
217 333638 : int i = 0;
218 :
219 : do
220 : {
221 3460226 : blocknum = stream->callback(stream,
222 : stream->callback_private_data,
223 : NULL);
224 3460226 : stream->blocknums[i++] = blocknum;
225 3308552 : } while (i < lengthof(stream->blocknums) &&
226 3460226 : blocknum != InvalidBlockNumber);
227 333638 : stream->blocknums_count = i;
228 333638 : stream->blocknums_next = 0;
229 333638 : }
230 : #endif
231 :
232 : static void
233 1551918 : read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
234 : {
235 : bool need_wait;
236 : int nblocks;
237 : int flags;
238 : int16 io_index;
239 : int16 overflow;
240 : int16 buffer_index;
241 :
242 : /* This should only be called with a pending read. */
243 : Assert(stream->pending_read_nblocks > 0);
244 : Assert(stream->pending_read_nblocks <= io_combine_limit);
245 :
246 : /* We had better not exceed the pin limit by starting this read. */
247 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
248 : stream->max_pinned_buffers);
249 :
250 : /* We had better not be overwriting an existing pinned buffer. */
251 1551918 : if (stream->pinned_buffers > 0)
252 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
253 : else
254 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
255 :
256 : /*
257 : * If advice hasn't been suppressed, this system supports it, and this
258 : * isn't a strictly sequential pattern, then we'll issue advice.
259 : */
260 1551918 : if (!suppress_advice &&
261 753160 : stream->advice_enabled &&
262 26510 : stream->pending_read_blocknum != stream->seq_blocknum)
263 3604 : flags = READ_BUFFERS_ISSUE_ADVICE;
264 : else
265 1548314 : flags = 0;
266 :
267 : /* We say how many blocks we want to read, but may be smaller on return. */
268 1551918 : buffer_index = stream->next_buffer_index;
269 1551918 : io_index = stream->next_io_index;
270 1551918 : nblocks = stream->pending_read_nblocks;
271 1551918 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
272 1551918 : &stream->buffers[buffer_index],
273 : stream->pending_read_blocknum,
274 : &nblocks,
275 : flags);
276 1551918 : stream->pinned_buffers += nblocks;
277 :
278 : /* Remember whether we need to wait before returning this buffer. */
279 1551918 : if (!need_wait)
280 : {
281 : /* Look-ahead distance decays, no I/O necessary (behavior A). */
282 1039294 : if (stream->distance > 1)
283 2218 : stream->distance--;
284 : }
285 : else
286 : {
287 : /*
288 : * Remember to call WaitReadBuffers() before returning head buffer.
289 : * Look-ahead distance will be adjusted after waiting.
290 : */
291 512624 : stream->ios[io_index].buffer_index = buffer_index;
292 512624 : if (++stream->next_io_index == stream->max_ios)
293 492960 : stream->next_io_index = 0;
294 : Assert(stream->ios_in_progress < stream->max_ios);
295 512624 : stream->ios_in_progress++;
296 512624 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
297 : }
298 :
299 : /*
300 : * We gave a contiguous range of buffer space to StartReadBuffers(), but
301 : * we want it to wrap around at queue_size. Slide overflowing buffers to
302 : * the front of the array.
303 : */
304 1551918 : overflow = (buffer_index + nblocks) - stream->queue_size;
305 1551918 : if (overflow > 0)
306 3596 : memmove(&stream->buffers[0],
307 3596 : &stream->buffers[stream->queue_size],
308 : sizeof(stream->buffers[0]) * overflow);
309 :
310 : /* Compute location of start of next read, without using % operator. */
311 1551918 : buffer_index += nblocks;
312 1551918 : if (buffer_index >= stream->queue_size)
313 299840 : buffer_index -= stream->queue_size;
314 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
315 1551918 : stream->next_buffer_index = buffer_index;
316 :
317 : /* Adjust the pending read to cover the remaining portion, if any. */
318 1551918 : stream->pending_read_blocknum += nblocks;
319 1551918 : stream->pending_read_nblocks -= nblocks;
320 1551918 : }
321 :
322 : static void
323 3063070 : read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
324 : {
325 4692018 : while (stream->ios_in_progress < stream->max_ios &&
326 4681530 : stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
327 : {
328 : BlockNumber blocknum;
329 : int16 buffer_index;
330 : void *per_buffer_data;
331 :
332 2920852 : if (stream->pending_read_nblocks == io_combine_limit)
333 : {
334 0 : read_stream_start_pending_read(stream, suppress_advice);
335 0 : suppress_advice = false;
336 0 : continue;
337 : }
338 :
339 : /*
340 : * See which block the callback wants next in the stream. We need to
341 : * compute the index of the Nth block of the pending read including
342 : * wrap-around, but we don't want to use the expensive % operator.
343 : */
344 2920852 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
345 2920852 : if (buffer_index >= stream->queue_size)
346 38512 : buffer_index -= stream->queue_size;
347 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
348 2920852 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
349 2920852 : blocknum = read_stream_get_block(stream, per_buffer_data);
350 2920852 : if (blocknum == InvalidBlockNumber)
351 : {
352 : /* End of stream. */
353 1291904 : stream->distance = 0;
354 1291904 : break;
355 : }
356 :
357 : /* Can we merge it with the pending read? */
358 1628948 : if (stream->pending_read_nblocks > 0 &&
359 81376 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
360 : {
361 81376 : stream->pending_read_nblocks++;
362 81376 : continue;
363 : }
364 :
365 : /* We have to start the pending read before we can build another. */
366 1547572 : while (stream->pending_read_nblocks > 0)
367 : {
368 0 : read_stream_start_pending_read(stream, suppress_advice);
369 0 : suppress_advice = false;
370 0 : if (stream->ios_in_progress == stream->max_ios)
371 : {
372 : /* And we've hit the limit. Rewind, and stop here. */
373 0 : read_stream_unget_block(stream, blocknum);
374 0 : return;
375 : }
376 : }
377 :
378 : /* This is the start of a new pending read. */
379 1547572 : stream->pending_read_blocknum = blocknum;
380 1547572 : stream->pending_read_nblocks = 1;
381 : }
382 :
383 : /*
384 : * We don't start the pending read just because we've hit the distance
385 : * limit, preferring to give it another chance to grow to full
386 : * io_combine_limit size once more buffers have been consumed. However,
387 : * if we've already reached io_combine_limit, or we've reached the
388 : * distance limit and there isn't anything pinned yet, or the callback has
389 : * signaled end-of-stream, we start the read immediately.
390 : */
391 3063070 : if (stream->pending_read_nblocks > 0 &&
392 1600524 : (stream->pending_read_nblocks == io_combine_limit ||
393 1597392 : (stream->pending_read_nblocks == stream->distance &&
394 1543292 : stream->pinned_buffers == 0) ||
395 54100 : stream->distance == 0) &&
396 1551980 : stream->ios_in_progress < stream->max_ios)
397 1551918 : read_stream_start_pending_read(stream, suppress_advice);
398 : }
399 :
400 : /*
401 : * Create a new read stream object that can be used to perform the equivalent
402 : * of a series of ReadBuffer() calls for one fork of one relation.
403 : * Internally, it generates larger vectored reads where possible by looking
404 : * ahead. The callback should return block numbers or InvalidBlockNumber to
405 : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
406 : * write extra data for each block into the space provided to it. It will
407 : * also receive callback_private_data for its own purposes.
408 : */
409 : ReadStream *
410 585472 : read_stream_begin_relation(int flags,
411 : BufferAccessStrategy strategy,
412 : Relation rel,
413 : ForkNumber forknum,
414 : ReadStreamBlockNumberCB callback,
415 : void *callback_private_data,
416 : size_t per_buffer_data_size)
417 : {
418 : ReadStream *stream;
419 : size_t size;
420 : int16 queue_size;
421 : int max_ios;
422 : int strategy_pin_limit;
423 : uint32 max_pinned_buffers;
424 : Oid tablespace_id;
425 : SMgrRelation smgr;
426 :
427 585472 : smgr = RelationGetSmgr(rel);
428 :
429 : /*
430 : * Decide how many I/Os we will allow to run at the same time. That
431 : * currently means advice to the kernel to tell it that we will soon read.
432 : * This number also affects how far we look ahead for opportunities to
433 : * start more I/Os.
434 : */
435 585472 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
436 1159394 : if (!OidIsValid(MyDatabaseId) ||
437 710080 : IsCatalogRelation(rel) ||
438 136158 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
439 : {
440 : /*
441 : * Avoid circularity while trying to look up tablespace settings or
442 : * before spccache.c is ready.
443 : */
444 449314 : max_ios = effective_io_concurrency;
445 : }
446 136158 : else if (flags & READ_STREAM_MAINTENANCE)
447 5396 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
448 : else
449 130762 : max_ios = get_tablespace_io_concurrency(tablespace_id);
450 :
451 : /* Cap to INT16_MAX to avoid overflowing below */
452 585472 : max_ios = Min(max_ios, PG_INT16_MAX);
453 :
454 : /*
455 : * Choose the maximum number of buffers we're prepared to pin. We try to
456 : * pin fewer if we can, though. We clamp it to at least io_combine_limit
457 : * so that we can have a chance to build up a full io_combine_limit sized
458 : * read, even when max_ios is zero. Be careful not to allow int16 to
459 : * overflow (even though that's not possible with the current GUC range
460 : * limits), allowing also for the spare entry and the overflow space.
461 : */
462 585472 : max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
463 585472 : max_pinned_buffers = Min(max_pinned_buffers,
464 : PG_INT16_MAX - io_combine_limit - 1);
465 :
466 : /* Give the strategy a chance to limit the number of buffers we pin. */
467 585472 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
468 585472 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
469 :
470 : /* Don't allow this backend to pin more than its share of buffers. */
471 585472 : if (SmgrIsTemp(smgr))
472 10724 : LimitAdditionalLocalPins(&max_pinned_buffers);
473 : else
474 574748 : LimitAdditionalPins(&max_pinned_buffers);
475 : Assert(max_pinned_buffers > 0);
476 :
477 : /*
478 : * We need one extra entry for buffers and per-buffer data, because users
479 : * of per-buffer data have access to the object until the next call to
480 : * read_stream_next_buffer(), so we need a gap between the head and tail
481 : * of the queue so that we don't clobber it.
482 : */
483 585472 : queue_size = max_pinned_buffers + 1;
484 :
485 : /*
486 : * Allocate the object, the buffers, the ios and per_data_data space in
487 : * one big chunk. Though we have queue_size buffers, we want to be able
488 : * to assume that all the buffers for a single read are contiguous (i.e.
489 : * don't wrap around halfway through), so we allow temporary overflows of
490 : * up to the maximum possible read size by allocating an extra
491 : * io_combine_limit - 1 elements.
492 : */
493 585472 : size = offsetof(ReadStream, buffers);
494 585472 : size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
495 585472 : size += sizeof(InProgressIO) * Max(1, max_ios);
496 585472 : size += per_buffer_data_size * queue_size;
497 585472 : size += MAXIMUM_ALIGNOF * 2;
498 585472 : stream = (ReadStream *) palloc(size);
499 585472 : memset(stream, 0, offsetof(ReadStream, buffers));
500 585472 : stream->ios = (InProgressIO *)
501 585472 : MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
502 585472 : if (per_buffer_data_size > 0)
503 0 : stream->per_buffer_data = (void *)
504 0 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
505 :
506 : #ifdef USE_PREFETCH
507 :
508 : /*
509 : * This system supports prefetching advice. We can use it as long as
510 : * direct I/O isn't enabled, the caller hasn't promised sequential access
511 : * (overriding our detection heuristics), and max_ios hasn't been set to
512 : * zero.
513 : */
514 585472 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
515 585266 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
516 : max_ios > 0)
517 18952 : stream->advice_enabled = true;
518 : #endif
519 :
520 : /*
521 : * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
522 : * above. If we had real asynchronous I/O we might need a slightly
523 : * different definition.
524 : */
525 585472 : if (max_ios == 0)
526 0 : max_ios = 1;
527 :
528 585472 : stream->max_ios = max_ios;
529 585472 : stream->per_buffer_data_size = per_buffer_data_size;
530 585472 : stream->max_pinned_buffers = max_pinned_buffers;
531 585472 : stream->queue_size = queue_size;
532 585472 : stream->callback = callback;
533 585472 : stream->callback_private_data = callback_private_data;
534 :
535 : /*
536 : * Skip the initial ramp-up phase if the caller says we're going to be
537 : * reading the whole relation. This way we start out assuming we'll be
538 : * doing full io_combine_limit sized reads (behavior B).
539 : */
540 585472 : if (flags & READ_STREAM_FULL)
541 5502 : stream->distance = Min(max_pinned_buffers, io_combine_limit);
542 : else
543 579970 : stream->distance = 1;
544 :
545 : /*
546 : * Since we always access the same relation, we can initialize parts of
547 : * the ReadBuffersOperation objects and leave them that way, to avoid
548 : * wasting CPU cycles writing to them for each read.
549 : */
550 1323878 : for (int i = 0; i < max_ios; ++i)
551 : {
552 738406 : stream->ios[i].op.rel = rel;
553 738406 : stream->ios[i].op.smgr = RelationGetSmgr(rel);
554 738406 : stream->ios[i].op.smgr_persistence = 0;
555 738406 : stream->ios[i].op.forknum = forknum;
556 738406 : stream->ios[i].op.strategy = strategy;
557 : }
558 :
559 585472 : return stream;
560 : }
561 :
562 : /*
563 : * Pull one pinned buffer out of a stream. Each call returns successive
564 : * blocks in the order specified by the callback. If per_buffer_data_size was
565 : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
566 : * per-buffer data that the callback had a chance to populate, which remains
567 : * valid until the next call to read_stream_next_buffer(). When the stream
568 : * runs out of data, InvalidBuffer is returned. The caller may decide to end
569 : * the stream early at any time by calling read_stream_end().
570 : */
571 : Buffer
572 7100308 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
573 : {
574 : Buffer buffer;
575 : int16 oldest_buffer_index;
576 :
577 : #ifndef READ_STREAM_DISABLE_FAST_PATH
578 :
579 : /*
580 : * A fast path for all-cached scans (behavior A). This is the same as the
581 : * usual algorithm, but it is specialized for no I/O and no per-buffer
582 : * data, so we can skip the queue management code, stay in the same buffer
583 : * slot and use singular StartReadBuffer().
584 : */
585 7100308 : if (likely(stream->fast_path))
586 : {
587 : BlockNumber next_blocknum;
588 :
589 : /* Fast path assumptions. */
590 : Assert(stream->ios_in_progress == 0);
591 : Assert(stream->pinned_buffers == 1);
592 : Assert(stream->distance == 1);
593 : Assert(stream->pending_read_nblocks == 0);
594 : Assert(stream->per_buffer_data_size == 0);
595 :
596 : /* We're going to return the buffer we pinned last time. */
597 2717382 : oldest_buffer_index = stream->oldest_buffer_index;
598 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
599 : stream->next_buffer_index);
600 2717382 : buffer = stream->buffers[oldest_buffer_index];
601 : Assert(buffer != InvalidBuffer);
602 :
603 : /* Choose the next block to pin. */
604 2717382 : if (unlikely(stream->blocknums_next == stream->blocknums_count))
605 333638 : read_stream_fill_blocknums(stream);
606 2717382 : next_blocknum = stream->blocknums[stream->blocknums_next++];
607 :
608 2717382 : if (likely(next_blocknum != InvalidBlockNumber))
609 : {
610 : /*
611 : * Pin a buffer for the next call. Same buffer entry, and
612 : * arbitrary I/O entry (they're all free). We don't have to
613 : * adjust pinned_buffers because we're transferring one to caller
614 : * but pinning one more.
615 : */
616 2587366 : if (likely(!StartReadBuffer(&stream->ios[0].op,
617 : &stream->buffers[oldest_buffer_index],
618 : next_blocknum,
619 : stream->advice_enabled ?
620 : READ_BUFFERS_ISSUE_ADVICE : 0)))
621 : {
622 : /* Fast return. */
623 2572052 : return buffer;
624 : }
625 :
626 : /* Next call must wait for I/O for the newly pinned buffer. */
627 15314 : stream->oldest_io_index = 0;
628 15314 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
629 15314 : stream->ios_in_progress = 1;
630 15314 : stream->ios[0].buffer_index = oldest_buffer_index;
631 15314 : stream->seq_blocknum = next_blocknum + 1;
632 : }
633 : else
634 : {
635 : /* No more blocks, end of stream. */
636 130016 : stream->distance = 0;
637 130016 : stream->oldest_buffer_index = stream->next_buffer_index;
638 130016 : stream->pinned_buffers = 0;
639 : }
640 :
641 145330 : stream->fast_path = false;
642 145330 : return buffer;
643 : }
644 : #endif
645 :
646 4382926 : if (unlikely(stream->pinned_buffers == 0))
647 : {
648 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
649 :
650 : /* End of stream reached? */
651 3682774 : if (stream->distance == 0)
652 2118614 : return InvalidBuffer;
653 :
654 : /*
655 : * The usual order of operations is that we look ahead at the bottom
656 : * of this function after potentially finishing an I/O and making
657 : * space for more, but if we're just starting up we'll need to crank
658 : * the handle to get started.
659 : */
660 1564160 : read_stream_look_ahead(stream, true);
661 :
662 : /* End of stream reached? */
663 1564160 : if (stream->pinned_buffers == 0)
664 : {
665 : Assert(stream->distance == 0);
666 765402 : return InvalidBuffer;
667 : }
668 : }
669 :
670 : /* Grab the oldest pinned buffer and associated per-buffer data. */
671 : Assert(stream->pinned_buffers > 0);
672 1498910 : oldest_buffer_index = stream->oldest_buffer_index;
673 : Assert(oldest_buffer_index >= 0 &&
674 : oldest_buffer_index < stream->queue_size);
675 1498910 : buffer = stream->buffers[oldest_buffer_index];
676 1498910 : if (per_buffer_data)
677 0 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
678 :
679 : Assert(BufferIsValid(buffer));
680 :
681 : /* Do we have to wait for an associated I/O first? */
682 1498910 : if (stream->ios_in_progress > 0 &&
683 538448 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
684 : {
685 527936 : int16 io_index = stream->oldest_io_index;
686 : int16 distance;
687 :
688 : /* Sanity check that we still agree on the buffers. */
689 : Assert(stream->ios[io_index].op.buffers ==
690 : &stream->buffers[oldest_buffer_index]);
691 :
692 527936 : WaitReadBuffers(&stream->ios[io_index].op);
693 :
694 : Assert(stream->ios_in_progress > 0);
695 527936 : stream->ios_in_progress--;
696 527936 : if (++stream->oldest_io_index == stream->max_ios)
697 508258 : stream->oldest_io_index = 0;
698 :
699 527936 : if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
700 : {
701 : /* Distance ramps up fast (behavior C). */
702 118 : distance = stream->distance * 2;
703 118 : distance = Min(distance, stream->max_pinned_buffers);
704 118 : stream->distance = distance;
705 : }
706 : else
707 : {
708 : /* No advice; move towards io_combine_limit (behavior B). */
709 527818 : if (stream->distance > io_combine_limit)
710 : {
711 0 : stream->distance--;
712 : }
713 : else
714 : {
715 527818 : distance = stream->distance * 2;
716 527818 : distance = Min(distance, io_combine_limit);
717 527818 : distance = Min(distance, stream->max_pinned_buffers);
718 527818 : stream->distance = distance;
719 : }
720 : }
721 : }
722 :
723 : #ifdef CLOBBER_FREED_MEMORY
724 : /* Clobber old buffer and per-buffer data for debugging purposes. */
725 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
726 :
727 : /*
728 : * The caller will get access to the per-buffer data, until the next call.
729 : * We wipe the one before, which is never occupied because queue_size
730 : * allowed one extra element. This will hopefully trip up client code
731 : * that is holding a dangling pointer to it.
732 : */
733 : if (stream->per_buffer_data)
734 : wipe_mem(get_per_buffer_data(stream,
735 : oldest_buffer_index == 0 ?
736 : stream->queue_size - 1 :
737 : oldest_buffer_index - 1),
738 : stream->per_buffer_data_size);
739 : #endif
740 :
741 : /* Pin transferred to caller. */
742 : Assert(stream->pinned_buffers > 0);
743 1498910 : stream->pinned_buffers--;
744 :
745 : /* Advance oldest buffer, with wrap-around. */
746 1498910 : stream->oldest_buffer_index++;
747 1498910 : if (stream->oldest_buffer_index == stream->queue_size)
748 289100 : stream->oldest_buffer_index = 0;
749 :
750 : /* Prepare for the next call. */
751 1498910 : read_stream_look_ahead(stream, false);
752 :
753 : #ifndef READ_STREAM_DISABLE_FAST_PATH
754 : /* See if we can take the fast path for all-cached scans next time. */
755 1498910 : if (stream->ios_in_progress == 0 &&
756 1016478 : stream->pinned_buffers == 1 &&
757 292546 : stream->distance == 1 &&
758 278170 : stream->pending_read_nblocks == 0 &&
759 277858 : stream->per_buffer_data_size == 0)
760 : {
761 277858 : stream->fast_path = true;
762 : }
763 : #endif
764 :
765 1498910 : return buffer;
766 : }
767 :
768 : /*
769 : * Reset a read stream by releasing any queued up buffers, allowing the stream
770 : * to be used again for different blocks. This can be used to clear an
771 : * end-of-stream condition and start again, or to throw away blocks that were
772 : * speculatively read and read some different blocks instead.
773 : */
774 : void
775 1564694 : read_stream_reset(ReadStream *stream)
776 : {
777 : Buffer buffer;
778 :
779 : /* Stop looking ahead. */
780 1564694 : stream->distance = 0;
781 :
782 : /* Forget buffered block numbers and fast path state. */
783 1564694 : stream->blocknums_next = 0;
784 1564694 : stream->blocknums_count = 0;
785 1564694 : stream->fast_path = false;
786 :
787 : /* Unpin anything that wasn't consumed. */
788 1725502 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
789 160808 : ReleaseBuffer(buffer);
790 :
791 : Assert(stream->pinned_buffers == 0);
792 : Assert(stream->ios_in_progress == 0);
793 :
794 : /* Start off assuming data is cached. */
795 1564694 : stream->distance = 1;
796 1564694 : }
797 :
798 : /*
799 : * Release and free a read stream.
800 : */
801 : void
802 583154 : read_stream_end(ReadStream *stream)
803 : {
804 583154 : read_stream_reset(stream);
805 583154 : pfree(stream);
806 583154 : }
|