Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "executor/instrument.h"
20 : #include "pgstat.h"
21 : #include "storage/aio.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "storage/fd.h"
25 : #include "utils/guc_hooks.h"
26 : #include "utils/memdebug.h"
27 : #include "utils/memutils.h"
28 : #include "utils/rel.h"
29 : #include "utils/resowner.h"
30 :
31 :
32 : /*#define LBDEBUG*/
33 :
34 : /* entry for buffer lookup hashtable */
35 : typedef struct
36 : {
37 : BufferTag key; /* Tag of a disk page */
38 : int id; /* Associated local buffer's index */
39 : } LocalBufferLookupEnt;
40 :
41 : /* Note: this macro only works on local buffers, not shared ones! */
42 : #define LocalBufHdrGetBlock(bufHdr) \
43 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
44 :
45 : int NLocBuffer = 0; /* until buffers are initialized */
46 :
47 : BufferDesc *LocalBufferDescriptors = NULL;
48 : Block *LocalBufferBlockPointers = NULL;
49 : int32 *LocalRefCount = NULL;
50 :
51 : static int nextFreeLocalBufId = 0;
52 :
53 : static HTAB *LocalBufHash = NULL;
54 :
55 : /* number of local buffers pinned at least once */
56 : static int NLocalPinnedBuffers = 0;
57 :
58 :
59 : static void InitLocalBuffers(void);
60 : static Block GetLocalBufferStorage(void);
61 : static Buffer GetLocalVictimBuffer(void);
62 :
63 :
64 : /*
65 : * PrefetchLocalBuffer -
66 : * initiate asynchronous read of a block of a relation
67 : *
68 : * Do PrefetchBuffer's work for temporary relations.
69 : * No-op if prefetching isn't compiled in.
70 : */
71 : PrefetchBufferResult
72 1357 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
73 : BlockNumber blockNum)
74 : {
75 1357 : PrefetchBufferResult result = {InvalidBuffer, false};
76 : BufferTag newTag; /* identity of requested block */
77 : LocalBufferLookupEnt *hresult;
78 :
79 1357 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
80 :
81 : /* Initialize local buffers if first request in this session */
82 1357 : if (LocalBufHash == NULL)
83 0 : InitLocalBuffers();
84 :
85 : /* See if the desired buffer already exists */
86 : hresult = (LocalBufferLookupEnt *)
87 1357 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
88 :
89 1357 : if (hresult)
90 : {
91 : /* Yes, so nothing to do */
92 1141 : result.recent_buffer = -hresult->id - 1;
93 : }
94 : else
95 : {
96 : #ifdef USE_PREFETCH
97 : /* Not in buffers, so initiate prefetch */
98 432 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
99 216 : smgrprefetch(smgr, forkNum, blockNum, 1))
100 : {
101 216 : result.initiated_io = true;
102 : }
103 : #endif /* USE_PREFETCH */
104 : }
105 :
106 1357 : return result;
107 : }
108 :
109 :
110 : /*
111 : * LocalBufferAlloc -
112 : * Find or create a local buffer for the given page of the given relation.
113 : *
114 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
115 : * any locking since this is all local. We support only default access
116 : * strategy (hence, usage_count is always advanced).
117 : */
118 : BufferDesc *
119 1648475 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
120 : bool *foundPtr)
121 : {
122 : BufferTag newTag; /* identity of requested block */
123 : LocalBufferLookupEnt *hresult;
124 : BufferDesc *bufHdr;
125 : Buffer victim_buffer;
126 : int bufid;
127 : bool found;
128 :
129 1648475 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
130 :
131 : /* Initialize local buffers if first request in this session */
132 1648475 : if (LocalBufHash == NULL)
133 17 : InitLocalBuffers();
134 :
135 1648475 : ResourceOwnerEnlarge(CurrentResourceOwner);
136 :
137 : /* See if the desired buffer already exists */
138 : hresult = (LocalBufferLookupEnt *)
139 1648475 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
140 :
141 1648475 : if (hresult)
142 : {
143 1637442 : bufid = hresult->id;
144 1637442 : bufHdr = GetLocalBufferDescriptor(bufid);
145 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
146 :
147 1637442 : *foundPtr = PinLocalBuffer(bufHdr, true);
148 : }
149 : else
150 : {
151 : uint64 buf_state;
152 :
153 11033 : victim_buffer = GetLocalVictimBuffer();
154 11025 : bufid = -victim_buffer - 1;
155 11025 : bufHdr = GetLocalBufferDescriptor(bufid);
156 :
157 : hresult = (LocalBufferLookupEnt *)
158 11025 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
159 11025 : if (found) /* shouldn't happen */
160 0 : elog(ERROR, "local buffer hash table corrupted");
161 11025 : hresult->id = bufid;
162 :
163 : /*
164 : * it's all ours now.
165 : */
166 11025 : bufHdr->tag = newTag;
167 :
168 11025 : buf_state = pg_atomic_read_u64(&bufHdr->state);
169 11025 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
170 11025 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
171 11025 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
172 :
173 11025 : *foundPtr = false;
174 : }
175 :
176 1648467 : return bufHdr;
177 : }
178 :
179 : /*
180 : * Like FlushBuffer(), just for local buffers.
181 : */
182 : void
183 4430 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
184 : {
185 : instr_time io_start;
186 4430 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
187 :
188 : Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
189 :
190 : /*
191 : * Try to start an I/O operation. There currently are no reasons for
192 : * StartLocalBufferIO to return anything other than
193 : * BUFFER_IO_READY_FOR_IO, so we raise an error in that case.
194 : */
195 4430 : if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
196 0 : elog(ERROR, "failed to start write IO on local buffer");
197 :
198 : /* Find smgr relation for buffer */
199 4430 : if (reln == NULL)
200 4038 : reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
201 : MyProcNumber);
202 :
203 4430 : PageSetChecksum(localpage, bufHdr->tag.blockNum);
204 :
205 4430 : io_start = pgstat_prepare_io_time(track_io_timing);
206 :
207 : /* And write... */
208 4430 : smgrwrite(reln,
209 4430 : BufTagGetForkNum(&bufHdr->tag),
210 : bufHdr->tag.blockNum,
211 : localpage,
212 : false);
213 :
214 : /* Temporary table I/O does not use Buffer Access Strategies */
215 4430 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
216 : IOOP_WRITE, io_start, 1, BLCKSZ);
217 :
218 : /* Mark not-dirty */
219 4430 : TerminateLocalBufferIO(bufHdr, true, 0, false);
220 :
221 4430 : pgBufferUsage.local_blks_written++;
222 4430 : }
223 :
224 : static Buffer
225 30327 : GetLocalVictimBuffer(void)
226 : {
227 : int victim_bufid;
228 : int trycounter;
229 : BufferDesc *bufHdr;
230 :
231 30327 : ResourceOwnerEnlarge(CurrentResourceOwner);
232 :
233 : /*
234 : * Need to get a new buffer. We use a clock-sweep algorithm (essentially
235 : * the same as what freelist.c does now...)
236 : */
237 30327 : trycounter = NLocBuffer;
238 : for (;;)
239 : {
240 167063 : victim_bufid = nextFreeLocalBufId;
241 :
242 167063 : if (++nextFreeLocalBufId >= NLocBuffer)
243 1448 : nextFreeLocalBufId = 0;
244 :
245 167063 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
246 :
247 167063 : if (LocalRefCount[victim_bufid] == 0)
248 : {
249 54091 : uint64 buf_state = pg_atomic_read_u64(&bufHdr->state);
250 :
251 54091 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
252 : {
253 23772 : buf_state -= BUF_USAGECOUNT_ONE;
254 23772 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
255 23772 : trycounter = NLocBuffer;
256 : }
257 30319 : else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
258 : {
259 : /*
260 : * This can be reached if the backend initiated AIO for this
261 : * buffer and then errored out.
262 : */
263 : }
264 : else
265 : {
266 : /* Found a usable buffer */
267 30319 : PinLocalBuffer(bufHdr, false);
268 30319 : break;
269 : }
270 : }
271 112972 : else if (--trycounter == 0)
272 8 : ereport(ERROR,
273 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
274 : errmsg("no empty local buffer available")));
275 : }
276 :
277 : /*
278 : * lazy memory allocation: allocate space on first use of a buffer.
279 : */
280 30319 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
281 : {
282 : /* Set pointer for use by BufferGetBlock() macro */
283 20951 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
284 : }
285 :
286 : /*
287 : * this buffer is not referenced but it might still be dirty. if that's
288 : * the case, write it out before reusing it!
289 : */
290 30319 : if (pg_atomic_read_u64(&bufHdr->state) & BM_DIRTY)
291 3964 : FlushLocalBuffer(bufHdr, NULL);
292 :
293 : /*
294 : * Remove the victim buffer from the hashtable and mark as invalid.
295 : */
296 30319 : if (pg_atomic_read_u64(&bufHdr->state) & BM_TAG_VALID)
297 : {
298 8100 : InvalidateLocalBuffer(bufHdr, false);
299 :
300 8100 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
301 : }
302 :
303 30319 : return BufferDescriptorGetBuffer(bufHdr);
304 : }
305 :
306 : /* see GetPinLimit() */
307 : uint32
308 42041 : GetLocalPinLimit(void)
309 : {
310 : /*
311 : * Every backend has its own temporary buffers, but we leave headroom for
312 : * concurrent pin-holders -- like multiple scans in the same query.
313 : */
314 42041 : return num_temp_buffers / 4;
315 : }
316 :
317 : /* see GetAdditionalPinLimit() */
318 : uint32
319 32758 : GetAdditionalLocalPinLimit(void)
320 : {
321 32758 : uint32 total = GetLocalPinLimit();
322 :
323 : Assert(NLocalPinnedBuffers <= num_temp_buffers);
324 :
325 32758 : if (NLocalPinnedBuffers >= total)
326 5216 : return 0;
327 27542 : return total - NLocalPinnedBuffers;
328 : }
329 :
330 : /* see LimitAdditionalPins() */
331 : void
332 14925 : LimitAdditionalLocalPins(uint32 *additional_pins)
333 : {
334 : uint32 max_pins;
335 :
336 14925 : if (*additional_pins <= 1)
337 14490 : return;
338 :
339 : /*
340 : * In contrast to LimitAdditionalPins() other backends don't play a role
341 : * here. We can allow up to NLocBuffer pins in total, but it might not be
342 : * initialized yet so read num_temp_buffers.
343 : */
344 435 : max_pins = (num_temp_buffers - NLocalPinnedBuffers);
345 :
346 435 : if (*additional_pins >= max_pins)
347 0 : *additional_pins = max_pins;
348 : }
349 :
350 : /*
351 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
352 : * temporary buffers.
353 : */
354 : BlockNumber
355 14925 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
356 : ForkNumber fork,
357 : uint32 flags,
358 : uint32 extend_by,
359 : BlockNumber extend_upto,
360 : Buffer *buffers,
361 : uint32 *extended_by)
362 : {
363 : BlockNumber first_block;
364 : instr_time io_start;
365 :
366 : /* Initialize local buffers if first request in this session */
367 14925 : if (LocalBufHash == NULL)
368 337 : InitLocalBuffers();
369 :
370 14925 : LimitAdditionalLocalPins(&extend_by);
371 :
372 34219 : for (uint32 i = 0; i < extend_by; i++)
373 : {
374 : BufferDesc *buf_hdr;
375 : Block buf_block;
376 :
377 19294 : buffers[i] = GetLocalVictimBuffer();
378 19294 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
379 19294 : buf_block = LocalBufHdrGetBlock(buf_hdr);
380 :
381 : /* new buffers are zero-filled */
382 19294 : MemSet(buf_block, 0, BLCKSZ);
383 : }
384 :
385 14925 : first_block = smgrnblocks(BMR_GET_SMGR(bmr), fork);
386 :
387 : if (extend_upto != InvalidBlockNumber)
388 : {
389 : /*
390 : * In contrast to shared relations, nothing could change the relation
391 : * size concurrently. Thus we shouldn't end up finding that we don't
392 : * need to do anything.
393 : */
394 : Assert(first_block <= extend_upto);
395 :
396 : Assert((uint64) first_block + extend_by <= extend_upto);
397 : }
398 :
399 : /* Fail if relation is already at maximum possible length */
400 14925 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
401 0 : ereport(ERROR,
402 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
403 : errmsg("cannot extend relation %s beyond %u blocks",
404 : relpath(BMR_GET_SMGR(bmr)->smgr_rlocator, fork).str,
405 : MaxBlockNumber)));
406 :
407 34219 : for (uint32 i = 0; i < extend_by; i++)
408 : {
409 : int victim_buf_id;
410 : BufferDesc *victim_buf_hdr;
411 : BufferTag tag;
412 : LocalBufferLookupEnt *hresult;
413 : bool found;
414 :
415 19294 : victim_buf_id = -buffers[i] - 1;
416 19294 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
417 :
418 : /* in case we need to pin an existing buffer below */
419 19294 : ResourceOwnerEnlarge(CurrentResourceOwner);
420 :
421 19294 : InitBufferTag(&tag, &BMR_GET_SMGR(bmr)->smgr_rlocator.locator, fork,
422 : first_block + i);
423 :
424 : hresult = (LocalBufferLookupEnt *)
425 19294 : hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
426 19294 : if (found)
427 : {
428 : BufferDesc *existing_hdr;
429 : uint64 buf_state;
430 :
431 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
432 :
433 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
434 0 : PinLocalBuffer(existing_hdr, false);
435 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
436 :
437 : /*
438 : * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
439 : */
440 0 : buf_state = pg_atomic_read_u64(&existing_hdr->state);
441 : Assert(buf_state & BM_TAG_VALID);
442 : Assert(!(buf_state & BM_DIRTY));
443 0 : buf_state &= ~BM_VALID;
444 0 : pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
445 :
446 : /* no need to loop for local buffers */
447 0 : StartLocalBufferIO(existing_hdr, true, true, NULL);
448 : }
449 : else
450 : {
451 19294 : uint64 buf_state = pg_atomic_read_u64(&victim_buf_hdr->state);
452 :
453 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY)));
454 :
455 19294 : victim_buf_hdr->tag = tag;
456 :
457 19294 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
458 :
459 19294 : pg_atomic_unlocked_write_u64(&victim_buf_hdr->state, buf_state);
460 :
461 19294 : hresult->id = victim_buf_id;
462 :
463 19294 : StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
464 : }
465 : }
466 :
467 14925 : io_start = pgstat_prepare_io_time(track_io_timing);
468 :
469 : /* actually extend relation */
470 14925 : smgrzeroextend(BMR_GET_SMGR(bmr), fork, first_block, extend_by, false);
471 :
472 14925 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
473 14925 : io_start, 1, extend_by * BLCKSZ);
474 :
475 34219 : for (uint32 i = 0; i < extend_by; i++)
476 : {
477 19294 : Buffer buf = buffers[i];
478 : BufferDesc *buf_hdr;
479 : uint64 buf_state;
480 :
481 19294 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
482 :
483 19294 : buf_state = pg_atomic_read_u64(&buf_hdr->state);
484 19294 : buf_state |= BM_VALID;
485 19294 : pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
486 : }
487 :
488 14925 : *extended_by = extend_by;
489 :
490 14925 : pgBufferUsage.local_blks_written += extend_by;
491 :
492 14925 : return first_block;
493 : }
494 :
495 : /*
496 : * MarkLocalBufferDirty -
497 : * mark a local buffer dirty
498 : */
499 : void
500 2405346 : MarkLocalBufferDirty(Buffer buffer)
501 : {
502 : int bufid;
503 : BufferDesc *bufHdr;
504 : uint64 buf_state;
505 :
506 : Assert(BufferIsLocal(buffer));
507 :
508 : #ifdef LBDEBUG
509 : fprintf(stderr, "LB DIRTY %d\n", buffer);
510 : #endif
511 :
512 2405346 : bufid = -buffer - 1;
513 :
514 : Assert(LocalRefCount[bufid] > 0);
515 :
516 2405346 : bufHdr = GetLocalBufferDescriptor(bufid);
517 :
518 2405346 : buf_state = pg_atomic_read_u64(&bufHdr->state);
519 :
520 2405346 : if (!(buf_state & BM_DIRTY))
521 19168 : pgBufferUsage.local_blks_dirtied++;
522 :
523 2405346 : buf_state |= BM_DIRTY;
524 :
525 2405346 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
526 2405346 : }
527 :
528 : /*
529 : * Like StartSharedBufferIO, but for local buffers
530 : */
531 : StartBufferIOResult
532 34819 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
533 : {
534 : uint64 buf_state;
535 :
536 : /*
537 : * With AIO the buffer could have IO in progress, e.g. when there are two
538 : * scans of the same relation. Either wait for the other IO (if wait =
539 : * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
540 : */
541 34819 : if (pgaio_wref_valid(&bufHdr->io_wref))
542 : {
543 0 : PgAioWaitRef buf_wref = bufHdr->io_wref;
544 :
545 0 : if (io_wref != NULL)
546 : {
547 : /* We've already asynchronously started this IO, so join it */
548 0 : *io_wref = buf_wref;
549 0 : return BUFFER_IO_IN_PROGRESS;
550 : }
551 :
552 : /*
553 : * For temp buffers we should never need to wait in
554 : * StartLocalBufferIO() when called with io_wref == NULL while there
555 : * are staged IOs, as it's not allowed to call code that is not aware
556 : * of AIO while in batch mode.
557 : */
558 : Assert(!pgaio_have_staged());
559 :
560 0 : if (!wait)
561 0 : return BUFFER_IO_IN_PROGRESS;
562 :
563 0 : pgaio_wref_wait(&buf_wref);
564 : }
565 :
566 : /* Once we get here, there is definitely no I/O active on this buffer */
567 :
568 : /* Check if someone else already did the I/O */
569 34819 : buf_state = pg_atomic_read_u64(&bufHdr->state);
570 34819 : if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
571 : {
572 4 : return BUFFER_IO_ALREADY_DONE;
573 : }
574 :
575 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
576 :
577 : /* local buffers don't track IO using resowners */
578 :
579 34815 : return BUFFER_IO_READY_FOR_IO;
580 : }
581 :
582 : /*
583 : * Like TerminateBufferIO, but for local buffers
584 : */
585 : void
586 15519 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint64 set_flag_bits,
587 : bool release_aio)
588 : {
589 : /* Only need to adjust flags */
590 15519 : uint64 buf_state = pg_atomic_read_u64(&bufHdr->state);
591 :
592 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
593 :
594 : /* Clear earlier errors, if this IO failed, it'll be marked again */
595 15519 : buf_state &= ~BM_IO_ERROR;
596 :
597 15519 : if (clear_dirty)
598 4430 : buf_state &= ~BM_DIRTY;
599 :
600 15519 : if (release_aio)
601 : {
602 : /* release pin held by IO subsystem, see also buffer_stage_common() */
603 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
604 11055 : buf_state -= BUF_REFCOUNT_ONE;
605 11055 : pgaio_wref_clear(&bufHdr->io_wref);
606 : }
607 :
608 15519 : buf_state |= set_flag_bits;
609 15519 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
610 :
611 : /* local buffers don't track IO using resowners */
612 :
613 : /* local buffers don't use the IO CV, as no other process can see buffer */
614 :
615 : /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
616 15519 : }
617 :
618 : /*
619 : * InvalidateLocalBuffer -- mark a local buffer invalid.
620 : *
621 : * If check_unreferenced is true, error out if the buffer is still
622 : * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
623 : * as part of changing the identity of a buffer, instead of just dropping the
624 : * buffer.
625 : *
626 : * See also InvalidateBuffer().
627 : */
628 : void
629 30319 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
630 : {
631 30319 : Buffer buffer = BufferDescriptorGetBuffer(bufHdr);
632 30319 : int bufid = -buffer - 1;
633 : uint64 buf_state;
634 : LocalBufferLookupEnt *hresult;
635 :
636 : /*
637 : * It's possible that we started IO on this buffer before e.g. aborting
638 : * the transaction that created a table. We need to wait for that IO to
639 : * complete before removing / reusing the buffer.
640 : */
641 30319 : if (pgaio_wref_valid(&bufHdr->io_wref))
642 : {
643 0 : PgAioWaitRef iow = bufHdr->io_wref;
644 :
645 0 : pgaio_wref_wait(&iow);
646 : Assert(!pgaio_wref_valid(&bufHdr->io_wref));
647 : }
648 :
649 30319 : buf_state = pg_atomic_read_u64(&bufHdr->state);
650 :
651 : /*
652 : * We need to test not just LocalRefCount[bufid] but also the BufferDesc
653 : * itself, as the latter is used to represent a pin by the AIO subsystem.
654 : * This can happen if AIO is initiated and then the query errors out.
655 : */
656 30319 : if (check_unreferenced &&
657 22219 : (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
658 0 : elog(ERROR, "block %u of %s is still referenced (local %d)",
659 : bufHdr->tag.blockNum,
660 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
661 : MyProcNumber,
662 : BufTagGetForkNum(&bufHdr->tag)).str,
663 : LocalRefCount[bufid]);
664 :
665 : /* Remove entry from hashtable */
666 : hresult = (LocalBufferLookupEnt *)
667 30319 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
668 30319 : if (!hresult) /* shouldn't happen */
669 0 : elog(ERROR, "local buffer hash table corrupted");
670 : /* Mark buffer invalid */
671 30319 : ClearBufferTag(&bufHdr->tag);
672 30319 : buf_state &= ~BUF_FLAG_MASK;
673 30319 : buf_state &= ~BUF_USAGECOUNT_MASK;
674 30319 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
675 30319 : }
676 :
677 : /*
678 : * DropRelationLocalBuffers
679 : * This function removes from the buffer pool all the pages of the
680 : * specified relation that have block numbers >= firstDelBlock.
681 : * (In particular, with firstDelBlock = 0, all pages are removed.)
682 : * Dirty pages are simply dropped, without bothering to write them
683 : * out first. Therefore, this is NOT rollback-able, and so should be
684 : * used only with extreme caution!
685 : *
686 : * See DropRelationBuffers in bufmgr.c for more notes.
687 : */
688 : void
689 498 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
690 : int nforks, BlockNumber *firstDelBlock)
691 : {
692 : int i;
693 : int j;
694 :
695 412146 : for (i = 0; i < NLocBuffer; i++)
696 : {
697 411648 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
698 : uint64 buf_state;
699 :
700 411648 : buf_state = pg_atomic_read_u64(&bufHdr->state);
701 :
702 411648 : if (!(buf_state & BM_TAG_VALID) ||
703 38156 : !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
704 410530 : continue;
705 :
706 1279 : for (j = 0; j < nforks; j++)
707 : {
708 1229 : if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
709 1110 : bufHdr->tag.blockNum >= firstDelBlock[j])
710 : {
711 1068 : InvalidateLocalBuffer(bufHdr, true);
712 1068 : break;
713 : }
714 : }
715 : }
716 498 : }
717 :
718 : /*
719 : * DropRelationAllLocalBuffers
720 : * This function removes from the buffer pool all pages of all forks
721 : * of the specified relation.
722 : *
723 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
724 : */
725 : void
726 4403 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
727 : {
728 : int i;
729 :
730 4138099 : for (i = 0; i < NLocBuffer; i++)
731 : {
732 4133696 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
733 : uint64 buf_state;
734 :
735 4133696 : buf_state = pg_atomic_read_u64(&bufHdr->state);
736 :
737 4448060 : if ((buf_state & BM_TAG_VALID) &&
738 314364 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
739 : {
740 20993 : InvalidateLocalBuffer(bufHdr, true);
741 : }
742 : }
743 4403 : }
744 :
745 : /*
746 : * InitLocalBuffers -
747 : * init the local buffer cache. Since most queries (esp. multi-user ones)
748 : * don't involve local buffers, we delay allocating actual memory for the
749 : * buffers until we need them; just make the buffer headers here.
750 : */
751 : static void
752 354 : InitLocalBuffers(void)
753 : {
754 354 : int nbufs = num_temp_buffers;
755 : HASHCTL info;
756 : int i;
757 :
758 : /*
759 : * Parallel workers can't access data in temporary tables, because they
760 : * have no visibility into the local buffers of their leader. This is a
761 : * convenient, low-cost place to provide a backstop check for that. Note
762 : * that we don't wish to prevent a parallel worker from accessing catalog
763 : * metadata about a temp table, so checks at higher levels would be
764 : * inappropriate.
765 : */
766 354 : if (IsParallelWorker())
767 0 : ereport(ERROR,
768 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
769 : errmsg("cannot access temporary tables during a parallel operation")));
770 :
771 : /* Allocate and zero buffer headers and auxiliary arrays */
772 354 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
773 354 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
774 354 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
775 354 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
776 0 : ereport(FATAL,
777 : (errcode(ERRCODE_OUT_OF_MEMORY),
778 : errmsg("out of memory")));
779 :
780 354 : nextFreeLocalBufId = 0;
781 :
782 : /* initialize fields that need to start off nonzero */
783 342522 : for (i = 0; i < nbufs; i++)
784 : {
785 342168 : BufferDesc *buf = GetLocalBufferDescriptor(i);
786 :
787 : /*
788 : * negative to indicate local buffer. This is tricky: shared buffers
789 : * start with 0. We have to start with -2. (Note that the routine
790 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
791 : * is -1.)
792 : */
793 342168 : buf->buf_id = -i - 2;
794 :
795 342168 : pgaio_wref_clear(&buf->io_wref);
796 :
797 : /*
798 : * Intentionally do not initialize the buffer's atomic variable
799 : * (besides zeroing the underlying memory above). That way we get
800 : * errors on platforms without atomics, if somebody (re-)introduces
801 : * atomic operations for local buffers.
802 : */
803 : }
804 :
805 : /* Create the lookup hash table */
806 354 : info.keysize = sizeof(BufferTag);
807 354 : info.entrysize = sizeof(LocalBufferLookupEnt);
808 :
809 354 : LocalBufHash = hash_create("Local Buffer Lookup Table",
810 : nbufs,
811 : &info,
812 : HASH_ELEM | HASH_BLOBS);
813 :
814 354 : if (!LocalBufHash)
815 0 : elog(ERROR, "could not initialize local buffer hash table");
816 :
817 : /* Initialization done, mark buffers allocated */
818 354 : NLocBuffer = nbufs;
819 354 : }
820 :
821 : /*
822 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
823 : * that does not support adjusting the usagecount - but so far it does not
824 : * seem worth the trouble.
825 : *
826 : * Note that ResourceOwnerEnlarge() must have been done already.
827 : */
828 : bool
829 1668293 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
830 : {
831 : uint64 buf_state;
832 1668293 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
833 1668293 : int bufid = -buffer - 1;
834 :
835 1668293 : buf_state = pg_atomic_read_u64(&buf_hdr->state);
836 :
837 1668293 : if (LocalRefCount[bufid] == 0)
838 : {
839 1557963 : NLocalPinnedBuffers++;
840 1557963 : buf_state += BUF_REFCOUNT_ONE;
841 1557963 : if (adjust_usagecount &&
842 1527252 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
843 : {
844 84549 : buf_state += BUF_USAGECOUNT_ONE;
845 : }
846 1557963 : pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
847 :
848 : /*
849 : * See comment in PinBuffer().
850 : *
851 : * If the buffer isn't allocated yet, it'll be marked as defined in
852 : * GetLocalBufferStorage().
853 : */
854 1557963 : if (LocalBufHdrGetBlock(buf_hdr) != NULL)
855 : VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
856 : }
857 1668293 : LocalRefCount[bufid]++;
858 1668293 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
859 : BufferDescriptorGetBuffer(buf_hdr));
860 :
861 1668293 : return buf_state & BM_VALID;
862 : }
863 :
864 : void
865 2134032 : UnpinLocalBuffer(Buffer buffer)
866 : {
867 2134032 : UnpinLocalBufferNoOwner(buffer);
868 2134032 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
869 2134032 : }
870 :
871 : void
872 2138024 : UnpinLocalBufferNoOwner(Buffer buffer)
873 : {
874 2138024 : int buffid = -buffer - 1;
875 :
876 : Assert(BufferIsLocal(buffer));
877 : Assert(LocalRefCount[buffid] > 0);
878 : Assert(NLocalPinnedBuffers > 0);
879 :
880 2138024 : if (--LocalRefCount[buffid] == 0)
881 : {
882 1557963 : BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
883 : uint64 buf_state;
884 :
885 1557963 : NLocalPinnedBuffers--;
886 :
887 1557963 : buf_state = pg_atomic_read_u64(&buf_hdr->state);
888 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
889 1557963 : buf_state -= BUF_REFCOUNT_ONE;
890 1557963 : pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
891 :
892 : /* see comment in UnpinBufferNoOwner */
893 : VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
894 : }
895 2138024 : }
896 :
897 : /*
898 : * GUC check_hook for temp_buffers
899 : */
900 : bool
901 1296 : check_temp_buffers(int *newval, void **extra, GucSource source)
902 : {
903 : /*
904 : * Once local buffers have been initialized, it's too late to change this.
905 : * However, if this is only a test call, allow it.
906 : */
907 1296 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
908 : {
909 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
910 0 : return false;
911 : }
912 1296 : return true;
913 : }
914 :
915 : /*
916 : * GetLocalBufferStorage - allocate memory for a local buffer
917 : *
918 : * The idea of this function is to aggregate our requests for storage
919 : * so that the memory manager doesn't see a whole lot of relatively small
920 : * requests. Since we'll never give back a local buffer once it's created
921 : * within a particular process, no point in burdening memmgr with separately
922 : * managed chunks.
923 : */
924 : static Block
925 20951 : GetLocalBufferStorage(void)
926 : {
927 : static char *cur_block = NULL;
928 : static int next_buf_in_block = 0;
929 : static int num_bufs_in_block = 0;
930 : static int total_bufs_allocated = 0;
931 : static MemoryContext LocalBufferContext = NULL;
932 :
933 : char *this_buf;
934 :
935 : Assert(total_bufs_allocated < NLocBuffer);
936 :
937 20951 : if (next_buf_in_block >= num_bufs_in_block)
938 : {
939 : /* Need to make a new request to memmgr */
940 : int num_bufs;
941 :
942 : /*
943 : * We allocate local buffers in a context of their own, so that the
944 : * space eaten for them is easily recognizable in MemoryContextStats
945 : * output. Create the context on first use.
946 : */
947 571 : if (LocalBufferContext == NULL)
948 354 : LocalBufferContext =
949 354 : AllocSetContextCreate(TopMemoryContext,
950 : "LocalBufferContext",
951 : ALLOCSET_DEFAULT_SIZES);
952 :
953 : /* Start with a 16-buffer request; subsequent ones double each time */
954 571 : num_bufs = Max(num_bufs_in_block * 2, 16);
955 : /* But not more than what we need for all remaining local bufs */
956 571 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
957 : /* And don't overflow MaxAllocSize, either */
958 571 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
959 :
960 : /* Buffers should be I/O aligned. */
961 1142 : cur_block = MemoryContextAllocAligned(LocalBufferContext,
962 571 : num_bufs * BLCKSZ,
963 : PG_IO_ALIGN_SIZE,
964 : 0);
965 :
966 571 : next_buf_in_block = 0;
967 571 : num_bufs_in_block = num_bufs;
968 : }
969 :
970 : /* Allocate next buffer in current memory block */
971 20951 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
972 20951 : next_buf_in_block++;
973 20951 : total_bufs_allocated++;
974 :
975 : /*
976 : * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
977 : * here. The block is actually undefined, but we want consistency with
978 : * the regular case of not needing to allocate memory. This is
979 : * specifically needed when method_io_uring.c fills the block, because
980 : * Valgrind doesn't recognize io_uring reads causing undefined memory to
981 : * become defined.
982 : */
983 : VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
984 :
985 20951 : return (Block) this_buf;
986 : }
987 :
988 : /*
989 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
990 : *
991 : * This is just like CheckForBufferLeaks(), but for local buffers.
992 : */
993 : static void
994 653328 : CheckForLocalBufferLeaks(void)
995 : {
996 : #ifdef USE_ASSERT_CHECKING
997 : if (LocalRefCount)
998 : {
999 : int RefCountErrors = 0;
1000 : int i;
1001 :
1002 : for (i = 0; i < NLocBuffer; i++)
1003 : {
1004 : if (LocalRefCount[i] != 0)
1005 : {
1006 : Buffer b = -i - 1;
1007 : char *s;
1008 :
1009 : s = DebugPrintBufferRefcount(b);
1010 : elog(WARNING, "local buffer refcount leak: %s", s);
1011 : pfree(s);
1012 :
1013 : RefCountErrors++;
1014 : }
1015 : }
1016 : Assert(RefCountErrors == 0);
1017 : }
1018 : #endif
1019 653328 : }
1020 :
1021 : /*
1022 : * AtEOXact_LocalBuffers - clean up at end of transaction.
1023 : *
1024 : * This is just like AtEOXact_Buffers, but for local buffers.
1025 : */
1026 : void
1027 629078 : AtEOXact_LocalBuffers(bool isCommit)
1028 : {
1029 629078 : CheckForLocalBufferLeaks();
1030 629078 : }
1031 :
1032 : /*
1033 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
1034 : *
1035 : * This is just like AtProcExit_Buffers, but for local buffers.
1036 : */
1037 : void
1038 24250 : AtProcExit_LocalBuffers(void)
1039 : {
1040 : /*
1041 : * We shouldn't be holding any remaining pins; if we are, and assertions
1042 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
1043 : * drop the temp rels.
1044 : */
1045 24250 : CheckForLocalBufferLeaks();
1046 24250 : }
|