Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "executor/instrument.h"
20 : #include "pgstat.h"
21 : #include "storage/aio.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "storage/fd.h"
25 : #include "utils/guc_hooks.h"
26 : #include "utils/memdebug.h"
27 : #include "utils/memutils.h"
28 : #include "utils/rel.h"
29 : #include "utils/resowner.h"
30 :
31 :
32 : /*#define LBDEBUG*/
33 :
34 : /* entry for buffer lookup hashtable */
35 : typedef struct
36 : {
37 : BufferTag key; /* Tag of a disk page */
38 : int id; /* Associated local buffer's index */
39 : } LocalBufferLookupEnt;
40 :
41 : /* Note: this macro only works on local buffers, not shared ones! */
42 : #define LocalBufHdrGetBlock(bufHdr) \
43 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
44 :
45 : int NLocBuffer = 0; /* until buffers are initialized */
46 :
47 : BufferDesc *LocalBufferDescriptors = NULL;
48 : Block *LocalBufferBlockPointers = NULL;
49 : int32 *LocalRefCount = NULL;
50 :
51 : static int nextFreeLocalBufId = 0;
52 :
53 : static HTAB *LocalBufHash = NULL;
54 :
55 : /* number of local buffers pinned at least once */
56 : static int NLocalPinnedBuffers = 0;
57 :
58 :
59 : static void InitLocalBuffers(void);
60 : static Block GetLocalBufferStorage(void);
61 : static Buffer GetLocalVictimBuffer(void);
62 :
63 :
64 : /*
65 : * PrefetchLocalBuffer -
66 : * initiate asynchronous read of a block of a relation
67 : *
68 : * Do PrefetchBuffer's work for temporary relations.
69 : * No-op if prefetching isn't compiled in.
70 : */
71 : PrefetchBufferResult
72 1357 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
73 : BlockNumber blockNum)
74 : {
75 1357 : PrefetchBufferResult result = {InvalidBuffer, false};
76 : BufferTag newTag; /* identity of requested block */
77 : LocalBufferLookupEnt *hresult;
78 :
79 1357 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
80 :
81 : /* Initialize local buffers if first request in this session */
82 1357 : if (LocalBufHash == NULL)
83 0 : InitLocalBuffers();
84 :
85 : /* See if the desired buffer already exists */
86 : hresult = (LocalBufferLookupEnt *)
87 1357 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
88 :
89 1357 : if (hresult)
90 : {
91 : /* Yes, so nothing to do */
92 1141 : result.recent_buffer = -hresult->id - 1;
93 : }
94 : else
95 : {
96 : #ifdef USE_PREFETCH
97 : /* Not in buffers, so initiate prefetch */
98 432 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
99 216 : smgrprefetch(smgr, forkNum, blockNum, 1))
100 : {
101 216 : result.initiated_io = true;
102 : }
103 : #endif /* USE_PREFETCH */
104 : }
105 :
106 1357 : return result;
107 : }
108 :
109 :
110 : /*
111 : * LocalBufferAlloc -
112 : * Find or create a local buffer for the given page of the given relation.
113 : *
114 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
115 : * any locking since this is all local. We support only default access
116 : * strategy (hence, usage_count is always advanced).
117 : */
118 : BufferDesc *
119 1648452 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
120 : bool *foundPtr)
121 : {
122 : BufferTag newTag; /* identity of requested block */
123 : LocalBufferLookupEnt *hresult;
124 : BufferDesc *bufHdr;
125 : Buffer victim_buffer;
126 : int bufid;
127 : bool found;
128 :
129 1648452 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
130 :
131 : /* Initialize local buffers if first request in this session */
132 1648452 : if (LocalBufHash == NULL)
133 17 : InitLocalBuffers();
134 :
135 1648452 : ResourceOwnerEnlarge(CurrentResourceOwner);
136 :
137 : /* See if the desired buffer already exists */
138 : hresult = (LocalBufferLookupEnt *)
139 1648452 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
140 :
141 1648452 : if (hresult)
142 : {
143 1637422 : bufid = hresult->id;
144 1637422 : bufHdr = GetLocalBufferDescriptor(bufid);
145 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
146 :
147 1637422 : *foundPtr = PinLocalBuffer(bufHdr, true);
148 : }
149 : else
150 : {
151 : uint64 buf_state;
152 :
153 11030 : victim_buffer = GetLocalVictimBuffer();
154 11022 : bufid = -victim_buffer - 1;
155 11022 : bufHdr = GetLocalBufferDescriptor(bufid);
156 :
157 : hresult = (LocalBufferLookupEnt *)
158 11022 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
159 11022 : if (found) /* shouldn't happen */
160 0 : elog(ERROR, "local buffer hash table corrupted");
161 11022 : hresult->id = bufid;
162 :
163 : /*
164 : * it's all ours now.
165 : */
166 11022 : bufHdr->tag = newTag;
167 :
168 11022 : buf_state = pg_atomic_read_u64(&bufHdr->state);
169 11022 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
170 11022 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
171 11022 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
172 :
173 11022 : *foundPtr = false;
174 : }
175 :
176 1648444 : return bufHdr;
177 : }
178 :
179 : /*
180 : * Like FlushBuffer(), just for local buffers.
181 : */
182 : void
183 4430 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
184 : {
185 : instr_time io_start;
186 4430 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
187 :
188 : Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
189 :
190 : /*
191 : * Try to start an I/O operation. There currently are no reasons for
192 : * StartLocalBufferIO to return anything other than
193 : * BUFFER_IO_READY_FOR_IO, so we raise an error in that case.
194 : */
195 4430 : if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
196 0 : elog(ERROR, "failed to start write IO on local buffer");
197 :
198 : /* Find smgr relation for buffer */
199 4430 : if (reln == NULL)
200 4038 : reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
201 : MyProcNumber);
202 :
203 4430 : PageSetChecksum(localpage, bufHdr->tag.blockNum);
204 :
205 4430 : io_start = pgstat_prepare_io_time(track_io_timing);
206 :
207 : /* And write... */
208 4430 : smgrwrite(reln,
209 4430 : BufTagGetForkNum(&bufHdr->tag),
210 : bufHdr->tag.blockNum,
211 : localpage,
212 : false);
213 :
214 : /* Temporary table I/O does not use Buffer Access Strategies */
215 4430 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
216 : IOOP_WRITE, io_start, 1, BLCKSZ);
217 :
218 : /* Mark not-dirty */
219 4430 : TerminateLocalBufferIO(bufHdr, true, 0, false);
220 :
221 4430 : pgBufferUsage.local_blks_written++;
222 4430 : }
223 :
224 : static Buffer
225 30307 : GetLocalVictimBuffer(void)
226 : {
227 : int victim_bufid;
228 : int trycounter;
229 : BufferDesc *bufHdr;
230 :
231 30307 : ResourceOwnerEnlarge(CurrentResourceOwner);
232 :
233 : /*
234 : * Need to get a new buffer. We use a clock-sweep algorithm (essentially
235 : * the same as what freelist.c does now...)
236 : */
237 30307 : trycounter = NLocBuffer;
238 : for (;;)
239 : {
240 167043 : victim_bufid = nextFreeLocalBufId;
241 :
242 167043 : if (++nextFreeLocalBufId >= NLocBuffer)
243 1448 : nextFreeLocalBufId = 0;
244 :
245 167043 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
246 :
247 167043 : if (LocalRefCount[victim_bufid] == 0)
248 : {
249 54071 : uint64 buf_state = pg_atomic_read_u64(&bufHdr->state);
250 :
251 54071 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
252 : {
253 23772 : buf_state -= BUF_USAGECOUNT_ONE;
254 23772 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
255 23772 : trycounter = NLocBuffer;
256 : }
257 30299 : else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
258 : {
259 : /*
260 : * This can be reached if the backend initiated AIO for this
261 : * buffer and then errored out.
262 : */
263 : }
264 : else
265 : {
266 : /* Found a usable buffer */
267 30299 : PinLocalBuffer(bufHdr, false);
268 30299 : break;
269 : }
270 : }
271 112972 : else if (--trycounter == 0)
272 8 : ereport(ERROR,
273 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
274 : errmsg("no empty local buffer available")));
275 : }
276 :
277 : /*
278 : * lazy memory allocation: allocate space on first use of a buffer.
279 : */
280 30299 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
281 : {
282 : /* Set pointer for use by BufferGetBlock() macro */
283 20931 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
284 : }
285 :
286 : /*
287 : * this buffer is not referenced but it might still be dirty. if that's
288 : * the case, write it out before reusing it!
289 : */
290 30299 : if (pg_atomic_read_u64(&bufHdr->state) & BM_DIRTY)
291 3964 : FlushLocalBuffer(bufHdr, NULL);
292 :
293 : /*
294 : * Remove the victim buffer from the hashtable and mark as invalid.
295 : */
296 30299 : if (pg_atomic_read_u64(&bufHdr->state) & BM_TAG_VALID)
297 : {
298 8100 : InvalidateLocalBuffer(bufHdr, false);
299 :
300 8100 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
301 : }
302 :
303 30299 : return BufferDescriptorGetBuffer(bufHdr);
304 : }
305 :
306 : /* see GetPinLimit() */
307 : uint32
308 9250 : GetLocalPinLimit(void)
309 : {
310 : /* Every backend has its own temporary buffers, and can pin them all. */
311 9250 : return num_temp_buffers;
312 : }
313 :
314 : /* see GetAdditionalPinLimit() */
315 : uint32
316 32754 : GetAdditionalLocalPinLimit(void)
317 : {
318 : Assert(NLocalPinnedBuffers <= num_temp_buffers);
319 32754 : return num_temp_buffers - NLocalPinnedBuffers;
320 : }
321 :
322 : /* see LimitAdditionalPins() */
323 : void
324 14912 : LimitAdditionalLocalPins(uint32 *additional_pins)
325 : {
326 : uint32 max_pins;
327 :
328 14912 : if (*additional_pins <= 1)
329 14479 : return;
330 :
331 : /*
332 : * In contrast to LimitAdditionalPins() other backends don't play a role
333 : * here. We can allow up to NLocBuffer pins in total, but it might not be
334 : * initialized yet so read num_temp_buffers.
335 : */
336 433 : max_pins = (num_temp_buffers - NLocalPinnedBuffers);
337 :
338 433 : if (*additional_pins >= max_pins)
339 0 : *additional_pins = max_pins;
340 : }
341 :
342 : /*
343 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
344 : * temporary buffers.
345 : */
346 : BlockNumber
347 14912 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
348 : ForkNumber fork,
349 : uint32 flags,
350 : uint32 extend_by,
351 : BlockNumber extend_upto,
352 : Buffer *buffers,
353 : uint32 *extended_by)
354 : {
355 : BlockNumber first_block;
356 : instr_time io_start;
357 :
358 : /* Initialize local buffers if first request in this session */
359 14912 : if (LocalBufHash == NULL)
360 333 : InitLocalBuffers();
361 :
362 14912 : LimitAdditionalLocalPins(&extend_by);
363 :
364 34189 : for (uint32 i = 0; i < extend_by; i++)
365 : {
366 : BufferDesc *buf_hdr;
367 : Block buf_block;
368 :
369 19277 : buffers[i] = GetLocalVictimBuffer();
370 19277 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
371 19277 : buf_block = LocalBufHdrGetBlock(buf_hdr);
372 :
373 : /* new buffers are zero-filled */
374 19277 : MemSet(buf_block, 0, BLCKSZ);
375 : }
376 :
377 14912 : first_block = smgrnblocks(BMR_GET_SMGR(bmr), fork);
378 :
379 : if (extend_upto != InvalidBlockNumber)
380 : {
381 : /*
382 : * In contrast to shared relations, nothing could change the relation
383 : * size concurrently. Thus we shouldn't end up finding that we don't
384 : * need to do anything.
385 : */
386 : Assert(first_block <= extend_upto);
387 :
388 : Assert((uint64) first_block + extend_by <= extend_upto);
389 : }
390 :
391 : /* Fail if relation is already at maximum possible length */
392 14912 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
393 0 : ereport(ERROR,
394 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
395 : errmsg("cannot extend relation %s beyond %u blocks",
396 : relpath(BMR_GET_SMGR(bmr)->smgr_rlocator, fork).str,
397 : MaxBlockNumber)));
398 :
399 34189 : for (uint32 i = 0; i < extend_by; i++)
400 : {
401 : int victim_buf_id;
402 : BufferDesc *victim_buf_hdr;
403 : BufferTag tag;
404 : LocalBufferLookupEnt *hresult;
405 : bool found;
406 :
407 19277 : victim_buf_id = -buffers[i] - 1;
408 19277 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
409 :
410 : /* in case we need to pin an existing buffer below */
411 19277 : ResourceOwnerEnlarge(CurrentResourceOwner);
412 :
413 19277 : InitBufferTag(&tag, &BMR_GET_SMGR(bmr)->smgr_rlocator.locator, fork,
414 : first_block + i);
415 :
416 : hresult = (LocalBufferLookupEnt *)
417 19277 : hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
418 19277 : if (found)
419 : {
420 : BufferDesc *existing_hdr;
421 : uint64 buf_state;
422 :
423 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
424 :
425 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
426 0 : PinLocalBuffer(existing_hdr, false);
427 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
428 :
429 : /*
430 : * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
431 : */
432 0 : buf_state = pg_atomic_read_u64(&existing_hdr->state);
433 : Assert(buf_state & BM_TAG_VALID);
434 : Assert(!(buf_state & BM_DIRTY));
435 0 : buf_state &= ~BM_VALID;
436 0 : pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
437 :
438 : /* no need to loop for local buffers */
439 0 : StartLocalBufferIO(existing_hdr, true, true, NULL);
440 : }
441 : else
442 : {
443 19277 : uint64 buf_state = pg_atomic_read_u64(&victim_buf_hdr->state);
444 :
445 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY)));
446 :
447 19277 : victim_buf_hdr->tag = tag;
448 :
449 19277 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
450 :
451 19277 : pg_atomic_unlocked_write_u64(&victim_buf_hdr->state, buf_state);
452 :
453 19277 : hresult->id = victim_buf_id;
454 :
455 19277 : StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
456 : }
457 : }
458 :
459 14912 : io_start = pgstat_prepare_io_time(track_io_timing);
460 :
461 : /* actually extend relation */
462 14912 : smgrzeroextend(BMR_GET_SMGR(bmr), fork, first_block, extend_by, false);
463 :
464 14912 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
465 14912 : io_start, 1, extend_by * BLCKSZ);
466 :
467 34189 : for (uint32 i = 0; i < extend_by; i++)
468 : {
469 19277 : Buffer buf = buffers[i];
470 : BufferDesc *buf_hdr;
471 : uint64 buf_state;
472 :
473 19277 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
474 :
475 19277 : buf_state = pg_atomic_read_u64(&buf_hdr->state);
476 19277 : buf_state |= BM_VALID;
477 19277 : pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
478 : }
479 :
480 14912 : *extended_by = extend_by;
481 :
482 14912 : pgBufferUsage.local_blks_written += extend_by;
483 :
484 14912 : return first_block;
485 : }
486 :
487 : /*
488 : * MarkLocalBufferDirty -
489 : * mark a local buffer dirty
490 : */
491 : void
492 2404835 : MarkLocalBufferDirty(Buffer buffer)
493 : {
494 : int bufid;
495 : BufferDesc *bufHdr;
496 : uint64 buf_state;
497 :
498 : Assert(BufferIsLocal(buffer));
499 :
500 : #ifdef LBDEBUG
501 : fprintf(stderr, "LB DIRTY %d\n", buffer);
502 : #endif
503 :
504 2404835 : bufid = -buffer - 1;
505 :
506 : Assert(LocalRefCount[bufid] > 0);
507 :
508 2404835 : bufHdr = GetLocalBufferDescriptor(bufid);
509 :
510 2404835 : buf_state = pg_atomic_read_u64(&bufHdr->state);
511 :
512 2404835 : if (!(buf_state & BM_DIRTY))
513 19155 : pgBufferUsage.local_blks_dirtied++;
514 :
515 2404835 : buf_state |= BM_DIRTY;
516 :
517 2404835 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
518 2404835 : }
519 :
520 : /*
521 : * Like StartSharedBufferIO, but for local buffers
522 : */
523 : StartBufferIOResult
524 34799 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
525 : {
526 : uint64 buf_state;
527 :
528 : /*
529 : * With AIO the buffer could have IO in progress, e.g. when there are two
530 : * scans of the same relation. Either wait for the other IO (if wait =
531 : * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
532 : */
533 34799 : if (pgaio_wref_valid(&bufHdr->io_wref))
534 : {
535 0 : PgAioWaitRef buf_wref = bufHdr->io_wref;
536 :
537 0 : if (io_wref != NULL)
538 : {
539 : /* We've already asynchronously started this IO, so join it */
540 0 : *io_wref = buf_wref;
541 0 : return BUFFER_IO_IN_PROGRESS;
542 : }
543 :
544 : /*
545 : * For temp buffers we should never need to wait in
546 : * StartLocalBufferIO() when called with io_wref == NULL while there
547 : * are staged IOs, as it's not allowed to call code that is not aware
548 : * of AIO while in batch mode.
549 : */
550 : Assert(!pgaio_have_staged());
551 :
552 0 : if (!wait)
553 0 : return BUFFER_IO_IN_PROGRESS;
554 :
555 0 : pgaio_wref_wait(&buf_wref);
556 : }
557 :
558 : /* Once we get here, there is definitely no I/O active on this buffer */
559 :
560 : /* Check if someone else already did the I/O */
561 34799 : buf_state = pg_atomic_read_u64(&bufHdr->state);
562 34799 : if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
563 : {
564 4 : return BUFFER_IO_ALREADY_DONE;
565 : }
566 :
567 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
568 :
569 : /* local buffers don't track IO using resowners */
570 :
571 34795 : return BUFFER_IO_READY_FOR_IO;
572 : }
573 :
574 : /*
575 : * Like TerminateBufferIO, but for local buffers
576 : */
577 : void
578 15516 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint64 set_flag_bits,
579 : bool release_aio)
580 : {
581 : /* Only need to adjust flags */
582 15516 : uint64 buf_state = pg_atomic_read_u64(&bufHdr->state);
583 :
584 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
585 :
586 : /* Clear earlier errors, if this IO failed, it'll be marked again */
587 15516 : buf_state &= ~BM_IO_ERROR;
588 :
589 15516 : if (clear_dirty)
590 4430 : buf_state &= ~BM_DIRTY;
591 :
592 15516 : if (release_aio)
593 : {
594 : /* release pin held by IO subsystem, see also buffer_stage_common() */
595 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
596 11052 : buf_state -= BUF_REFCOUNT_ONE;
597 11052 : pgaio_wref_clear(&bufHdr->io_wref);
598 : }
599 :
600 15516 : buf_state |= set_flag_bits;
601 15516 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
602 :
603 : /* local buffers don't track IO using resowners */
604 :
605 : /* local buffers don't use the IO CV, as no other process can see buffer */
606 :
607 : /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
608 15516 : }
609 :
610 : /*
611 : * InvalidateLocalBuffer -- mark a local buffer invalid.
612 : *
613 : * If check_unreferenced is true, error out if the buffer is still
614 : * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
615 : * as part of changing the identity of a buffer, instead of just dropping the
616 : * buffer.
617 : *
618 : * See also InvalidateBuffer().
619 : */
620 : void
621 30299 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
622 : {
623 30299 : Buffer buffer = BufferDescriptorGetBuffer(bufHdr);
624 30299 : int bufid = -buffer - 1;
625 : uint64 buf_state;
626 : LocalBufferLookupEnt *hresult;
627 :
628 : /*
629 : * It's possible that we started IO on this buffer before e.g. aborting
630 : * the transaction that created a table. We need to wait for that IO to
631 : * complete before removing / reusing the buffer.
632 : */
633 30299 : if (pgaio_wref_valid(&bufHdr->io_wref))
634 : {
635 0 : PgAioWaitRef iow = bufHdr->io_wref;
636 :
637 0 : pgaio_wref_wait(&iow);
638 : Assert(!pgaio_wref_valid(&bufHdr->io_wref));
639 : }
640 :
641 30299 : buf_state = pg_atomic_read_u64(&bufHdr->state);
642 :
643 : /*
644 : * We need to test not just LocalRefCount[bufid] but also the BufferDesc
645 : * itself, as the latter is used to represent a pin by the AIO subsystem.
646 : * This can happen if AIO is initiated and then the query errors out.
647 : */
648 30299 : if (check_unreferenced &&
649 22199 : (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
650 0 : elog(ERROR, "block %u of %s is still referenced (local %d)",
651 : bufHdr->tag.blockNum,
652 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
653 : MyProcNumber,
654 : BufTagGetForkNum(&bufHdr->tag)).str,
655 : LocalRefCount[bufid]);
656 :
657 : /* Remove entry from hashtable */
658 : hresult = (LocalBufferLookupEnt *)
659 30299 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
660 30299 : if (!hresult) /* shouldn't happen */
661 0 : elog(ERROR, "local buffer hash table corrupted");
662 : /* Mark buffer invalid */
663 30299 : ClearBufferTag(&bufHdr->tag);
664 30299 : buf_state &= ~BUF_FLAG_MASK;
665 30299 : buf_state &= ~BUF_USAGECOUNT_MASK;
666 30299 : pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
667 30299 : }
668 :
669 : /*
670 : * DropRelationLocalBuffers
671 : * This function removes from the buffer pool all the pages of the
672 : * specified relation that have block numbers >= firstDelBlock.
673 : * (In particular, with firstDelBlock = 0, all pages are removed.)
674 : * Dirty pages are simply dropped, without bothering to write them
675 : * out first. Therefore, this is NOT rollback-able, and so should be
676 : * used only with extreme caution!
677 : *
678 : * See DropRelationBuffers in bufmgr.c for more notes.
679 : */
680 : void
681 498 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
682 : int nforks, BlockNumber *firstDelBlock)
683 : {
684 : int i;
685 : int j;
686 :
687 412146 : for (i = 0; i < NLocBuffer; i++)
688 : {
689 411648 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
690 : uint64 buf_state;
691 :
692 411648 : buf_state = pg_atomic_read_u64(&bufHdr->state);
693 :
694 411648 : if (!(buf_state & BM_TAG_VALID) ||
695 38156 : !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
696 410530 : continue;
697 :
698 1279 : for (j = 0; j < nforks; j++)
699 : {
700 1229 : if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
701 1110 : bufHdr->tag.blockNum >= firstDelBlock[j])
702 : {
703 1068 : InvalidateLocalBuffer(bufHdr, true);
704 1068 : break;
705 : }
706 : }
707 : }
708 498 : }
709 :
710 : /*
711 : * DropRelationAllLocalBuffers
712 : * This function removes from the buffer pool all pages of all forks
713 : * of the specified relation.
714 : *
715 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
716 : */
717 : void
718 4374 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
719 : {
720 : int i;
721 :
722 4108374 : for (i = 0; i < NLocBuffer; i++)
723 : {
724 4104000 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
725 : uint64 buf_state;
726 :
727 4104000 : buf_state = pg_atomic_read_u64(&bufHdr->state);
728 :
729 4418176 : if ((buf_state & BM_TAG_VALID) &&
730 314176 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
731 : {
732 20973 : InvalidateLocalBuffer(bufHdr, true);
733 : }
734 : }
735 4374 : }
736 :
737 : /*
738 : * InitLocalBuffers -
739 : * init the local buffer cache. Since most queries (esp. multi-user ones)
740 : * don't involve local buffers, we delay allocating actual memory for the
741 : * buffers until we need them; just make the buffer headers here.
742 : */
743 : static void
744 350 : InitLocalBuffers(void)
745 : {
746 350 : int nbufs = num_temp_buffers;
747 : HASHCTL info;
748 : int i;
749 :
750 : /*
751 : * Parallel workers can't access data in temporary tables, because they
752 : * have no visibility into the local buffers of their leader. This is a
753 : * convenient, low-cost place to provide a backstop check for that. Note
754 : * that we don't wish to prevent a parallel worker from accessing catalog
755 : * metadata about a temp table, so checks at higher levels would be
756 : * inappropriate.
757 : */
758 350 : if (IsParallelWorker())
759 0 : ereport(ERROR,
760 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
761 : errmsg("cannot access temporary tables during a parallel operation")));
762 :
763 : /* Allocate and zero buffer headers and auxiliary arrays */
764 350 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
765 350 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
766 350 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
767 350 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
768 0 : ereport(FATAL,
769 : (errcode(ERRCODE_OUT_OF_MEMORY),
770 : errmsg("out of memory")));
771 :
772 350 : nextFreeLocalBufId = 0;
773 :
774 : /* initialize fields that need to start off nonzero */
775 338422 : for (i = 0; i < nbufs; i++)
776 : {
777 338072 : BufferDesc *buf = GetLocalBufferDescriptor(i);
778 :
779 : /*
780 : * negative to indicate local buffer. This is tricky: shared buffers
781 : * start with 0. We have to start with -2. (Note that the routine
782 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
783 : * is -1.)
784 : */
785 338072 : buf->buf_id = -i - 2;
786 :
787 338072 : pgaio_wref_clear(&buf->io_wref);
788 :
789 : /*
790 : * Intentionally do not initialize the buffer's atomic variable
791 : * (besides zeroing the underlying memory above). That way we get
792 : * errors on platforms without atomics, if somebody (re-)introduces
793 : * atomic operations for local buffers.
794 : */
795 : }
796 :
797 : /* Create the lookup hash table */
798 350 : info.keysize = sizeof(BufferTag);
799 350 : info.entrysize = sizeof(LocalBufferLookupEnt);
800 :
801 350 : LocalBufHash = hash_create("Local Buffer Lookup Table",
802 : nbufs,
803 : &info,
804 : HASH_ELEM | HASH_BLOBS);
805 :
806 350 : if (!LocalBufHash)
807 0 : elog(ERROR, "could not initialize local buffer hash table");
808 :
809 : /* Initialization done, mark buffers allocated */
810 350 : NLocBuffer = nbufs;
811 350 : }
812 :
813 : /*
814 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
815 : * that does not support adjusting the usagecount - but so far it does not
816 : * seem worth the trouble.
817 : *
818 : * Note that ResourceOwnerEnlarge() must have been done already.
819 : */
820 : bool
821 1668253 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
822 : {
823 : uint64 buf_state;
824 1668253 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
825 1668253 : int bufid = -buffer - 1;
826 :
827 1668253 : buf_state = pg_atomic_read_u64(&buf_hdr->state);
828 :
829 1668253 : if (LocalRefCount[bufid] == 0)
830 : {
831 1557943 : NLocalPinnedBuffers++;
832 1557943 : buf_state += BUF_REFCOUNT_ONE;
833 1557943 : if (adjust_usagecount &&
834 1527252 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
835 : {
836 84474 : buf_state += BUF_USAGECOUNT_ONE;
837 : }
838 1557943 : pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
839 :
840 : /*
841 : * See comment in PinBuffer().
842 : *
843 : * If the buffer isn't allocated yet, it'll be marked as defined in
844 : * GetLocalBufferStorage().
845 : */
846 1557943 : if (LocalBufHdrGetBlock(buf_hdr) != NULL)
847 : VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
848 : }
849 1668253 : LocalRefCount[bufid]++;
850 1668253 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
851 : BufferDescriptorGetBuffer(buf_hdr));
852 :
853 1668253 : return buf_state & BM_VALID;
854 : }
855 :
856 : void
857 2133947 : UnpinLocalBuffer(Buffer buffer)
858 : {
859 2133947 : UnpinLocalBufferNoOwner(buffer);
860 2133947 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
861 2133947 : }
862 :
863 : void
864 2137931 : UnpinLocalBufferNoOwner(Buffer buffer)
865 : {
866 2137931 : int buffid = -buffer - 1;
867 :
868 : Assert(BufferIsLocal(buffer));
869 : Assert(LocalRefCount[buffid] > 0);
870 : Assert(NLocalPinnedBuffers > 0);
871 :
872 2137931 : if (--LocalRefCount[buffid] == 0)
873 : {
874 1557943 : BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
875 : uint64 buf_state;
876 :
877 1557943 : NLocalPinnedBuffers--;
878 :
879 1557943 : buf_state = pg_atomic_read_u64(&buf_hdr->state);
880 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
881 1557943 : buf_state -= BUF_REFCOUNT_ONE;
882 1557943 : pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
883 :
884 : /* see comment in UnpinBufferNoOwner */
885 : VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
886 : }
887 2137931 : }
888 :
889 : /*
890 : * GUC check_hook for temp_buffers
891 : */
892 : bool
893 1291 : check_temp_buffers(int *newval, void **extra, GucSource source)
894 : {
895 : /*
896 : * Once local buffers have been initialized, it's too late to change this.
897 : * However, if this is only a test call, allow it.
898 : */
899 1291 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
900 : {
901 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
902 0 : return false;
903 : }
904 1291 : return true;
905 : }
906 :
907 : /*
908 : * GetLocalBufferStorage - allocate memory for a local buffer
909 : *
910 : * The idea of this function is to aggregate our requests for storage
911 : * so that the memory manager doesn't see a whole lot of relatively small
912 : * requests. Since we'll never give back a local buffer once it's created
913 : * within a particular process, no point in burdening memmgr with separately
914 : * managed chunks.
915 : */
916 : static Block
917 20931 : GetLocalBufferStorage(void)
918 : {
919 : static char *cur_block = NULL;
920 : static int next_buf_in_block = 0;
921 : static int num_bufs_in_block = 0;
922 : static int total_bufs_allocated = 0;
923 : static MemoryContext LocalBufferContext = NULL;
924 :
925 : char *this_buf;
926 :
927 : Assert(total_bufs_allocated < NLocBuffer);
928 :
929 20931 : if (next_buf_in_block >= num_bufs_in_block)
930 : {
931 : /* Need to make a new request to memmgr */
932 : int num_bufs;
933 :
934 : /*
935 : * We allocate local buffers in a context of their own, so that the
936 : * space eaten for them is easily recognizable in MemoryContextStats
937 : * output. Create the context on first use.
938 : */
939 567 : if (LocalBufferContext == NULL)
940 350 : LocalBufferContext =
941 350 : AllocSetContextCreate(TopMemoryContext,
942 : "LocalBufferContext",
943 : ALLOCSET_DEFAULT_SIZES);
944 :
945 : /* Start with a 16-buffer request; subsequent ones double each time */
946 567 : num_bufs = Max(num_bufs_in_block * 2, 16);
947 : /* But not more than what we need for all remaining local bufs */
948 567 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
949 : /* And don't overflow MaxAllocSize, either */
950 567 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
951 :
952 : /* Buffers should be I/O aligned. */
953 1134 : cur_block = MemoryContextAllocAligned(LocalBufferContext,
954 567 : num_bufs * BLCKSZ,
955 : PG_IO_ALIGN_SIZE,
956 : 0);
957 :
958 567 : next_buf_in_block = 0;
959 567 : num_bufs_in_block = num_bufs;
960 : }
961 :
962 : /* Allocate next buffer in current memory block */
963 20931 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
964 20931 : next_buf_in_block++;
965 20931 : total_bufs_allocated++;
966 :
967 : /*
968 : * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
969 : * here. The block is actually undefined, but we want consistency with
970 : * the regular case of not needing to allocate memory. This is
971 : * specifically needed when method_io_uring.c fills the block, because
972 : * Valgrind doesn't recognize io_uring reads causing undefined memory to
973 : * become defined.
974 : */
975 : VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
976 :
977 20931 : return (Block) this_buf;
978 : }
979 :
980 : /*
981 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
982 : *
983 : * This is just like CheckForBufferLeaks(), but for local buffers.
984 : */
985 : static void
986 654594 : CheckForLocalBufferLeaks(void)
987 : {
988 : #ifdef USE_ASSERT_CHECKING
989 : if (LocalRefCount)
990 : {
991 : int RefCountErrors = 0;
992 : int i;
993 :
994 : for (i = 0; i < NLocBuffer; i++)
995 : {
996 : if (LocalRefCount[i] != 0)
997 : {
998 : Buffer b = -i - 1;
999 : char *s;
1000 :
1001 : s = DebugPrintBufferRefcount(b);
1002 : elog(WARNING, "local buffer refcount leak: %s", s);
1003 : pfree(s);
1004 :
1005 : RefCountErrors++;
1006 : }
1007 : }
1008 : Assert(RefCountErrors == 0);
1009 : }
1010 : #endif
1011 654594 : }
1012 :
1013 : /*
1014 : * AtEOXact_LocalBuffers - clean up at end of transaction.
1015 : *
1016 : * This is just like AtEOXact_Buffers, but for local buffers.
1017 : */
1018 : void
1019 629830 : AtEOXact_LocalBuffers(bool isCommit)
1020 : {
1021 629830 : CheckForLocalBufferLeaks();
1022 629830 : }
1023 :
1024 : /*
1025 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
1026 : *
1027 : * This is just like AtProcExit_Buffers, but for local buffers.
1028 : */
1029 : void
1030 24764 : AtProcExit_LocalBuffers(void)
1031 : {
1032 : /*
1033 : * We shouldn't be holding any remaining pins; if we are, and assertions
1034 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
1035 : * drop the temp rels.
1036 : */
1037 24764 : CheckForLocalBufferLeaks();
1038 24764 : }
|