Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "executor/instrument.h"
20 : #include "pgstat.h"
21 : #include "storage/aio.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "storage/fd.h"
25 : #include "utils/guc_hooks.h"
26 : #include "utils/memdebug.h"
27 : #include "utils/memutils.h"
28 : #include "utils/rel.h"
29 : #include "utils/resowner.h"
30 :
31 :
32 : /*#define LBDEBUG*/
33 :
34 : /* entry for buffer lookup hashtable */
35 : typedef struct
36 : {
37 : BufferTag key; /* Tag of a disk page */
38 : int id; /* Associated local buffer's index */
39 : } LocalBufferLookupEnt;
40 :
41 : /* Note: this macro only works on local buffers, not shared ones! */
42 : #define LocalBufHdrGetBlock(bufHdr) \
43 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
44 :
45 : int NLocBuffer = 0; /* until buffers are initialized */
46 :
47 : BufferDesc *LocalBufferDescriptors = NULL;
48 : Block *LocalBufferBlockPointers = NULL;
49 : int32 *LocalRefCount = NULL;
50 :
51 : static int nextFreeLocalBufId = 0;
52 :
53 : static HTAB *LocalBufHash = NULL;
54 :
55 : /* number of local buffers pinned at least once */
56 : static int NLocalPinnedBuffers = 0;
57 :
58 :
59 : static void InitLocalBuffers(void);
60 : static Block GetLocalBufferStorage(void);
61 : static Buffer GetLocalVictimBuffer(void);
62 :
63 :
64 : /*
65 : * PrefetchLocalBuffer -
66 : * initiate asynchronous read of a block of a relation
67 : *
68 : * Do PrefetchBuffer's work for temporary relations.
69 : * No-op if prefetching isn't compiled in.
70 : */
71 : PrefetchBufferResult
72 1566 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
73 : BlockNumber blockNum)
74 : {
75 1566 : PrefetchBufferResult result = {InvalidBuffer, false};
76 : BufferTag newTag; /* identity of requested block */
77 : LocalBufferLookupEnt *hresult;
78 :
79 1566 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
80 :
81 : /* Initialize local buffers if first request in this session */
82 1566 : if (LocalBufHash == NULL)
83 0 : InitLocalBuffers();
84 :
85 : /* See if the desired buffer already exists */
86 : hresult = (LocalBufferLookupEnt *)
87 1566 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
88 :
89 1566 : if (hresult)
90 : {
91 : /* Yes, so nothing to do */
92 1566 : result.recent_buffer = -hresult->id - 1;
93 : }
94 : else
95 : {
96 : #ifdef USE_PREFETCH
97 : /* Not in buffers, so initiate prefetch */
98 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
99 0 : smgrprefetch(smgr, forkNum, blockNum, 1))
100 : {
101 0 : result.initiated_io = true;
102 : }
103 : #endif /* USE_PREFETCH */
104 : }
105 :
106 1566 : return result;
107 : }
108 :
109 :
110 : /*
111 : * LocalBufferAlloc -
112 : * Find or create a local buffer for the given page of the given relation.
113 : *
114 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
115 : * any locking since this is all local. We support only default access
116 : * strategy (hence, usage_count is always advanced).
117 : */
118 : BufferDesc *
119 2554406 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
120 : bool *foundPtr)
121 : {
122 : BufferTag newTag; /* identity of requested block */
123 : LocalBufferLookupEnt *hresult;
124 : BufferDesc *bufHdr;
125 : Buffer victim_buffer;
126 : int bufid;
127 : bool found;
128 :
129 2554406 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
130 :
131 : /* Initialize local buffers if first request in this session */
132 2554406 : if (LocalBufHash == NULL)
133 26 : InitLocalBuffers();
134 :
135 2554406 : ResourceOwnerEnlarge(CurrentResourceOwner);
136 :
137 : /* See if the desired buffer already exists */
138 : hresult = (LocalBufferLookupEnt *)
139 2554406 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
140 :
141 2554406 : if (hresult)
142 : {
143 2537636 : bufid = hresult->id;
144 2537636 : bufHdr = GetLocalBufferDescriptor(bufid);
145 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
146 :
147 2537636 : *foundPtr = PinLocalBuffer(bufHdr, true);
148 : }
149 : else
150 : {
151 : uint32 buf_state;
152 :
153 16770 : victim_buffer = GetLocalVictimBuffer();
154 16758 : bufid = -victim_buffer - 1;
155 16758 : bufHdr = GetLocalBufferDescriptor(bufid);
156 :
157 : hresult = (LocalBufferLookupEnt *)
158 16758 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
159 16758 : if (found) /* shouldn't happen */
160 0 : elog(ERROR, "local buffer hash table corrupted");
161 16758 : hresult->id = bufid;
162 :
163 : /*
164 : * it's all ours now.
165 : */
166 16758 : bufHdr->tag = newTag;
167 :
168 16758 : buf_state = pg_atomic_read_u32(&bufHdr->state);
169 16758 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
170 16758 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
171 16758 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
172 :
173 16758 : *foundPtr = false;
174 : }
175 :
176 2554394 : return bufHdr;
177 : }
178 :
179 : /*
180 : * Like FlushBuffer(), just for local buffers.
181 : */
182 : void
183 7268 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
184 : {
185 : instr_time io_start;
186 7268 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
187 :
188 : Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
189 :
190 : /*
191 : * Try to start an I/O operation. There currently are no reasons for
192 : * StartLocalBufferIO to return false, so we raise an error in that case.
193 : */
194 7268 : if (!StartLocalBufferIO(bufHdr, false, false))
195 0 : elog(ERROR, "failed to start write IO on local buffer");
196 :
197 : /* Find smgr relation for buffer */
198 7268 : if (reln == NULL)
199 6668 : reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
200 : MyProcNumber);
201 :
202 7268 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
203 :
204 7268 : io_start = pgstat_prepare_io_time(track_io_timing);
205 :
206 : /* And write... */
207 7268 : smgrwrite(reln,
208 7268 : BufTagGetForkNum(&bufHdr->tag),
209 : bufHdr->tag.blockNum,
210 : localpage,
211 : false);
212 :
213 : /* Temporary table I/O does not use Buffer Access Strategies */
214 7268 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
215 : IOOP_WRITE, io_start, 1, BLCKSZ);
216 :
217 : /* Mark not-dirty */
218 7268 : TerminateLocalBufferIO(bufHdr, true, 0, false);
219 :
220 7268 : pgBufferUsage.local_blks_written++;
221 7268 : }
222 :
223 : static Buffer
224 46162 : GetLocalVictimBuffer(void)
225 : {
226 : int victim_bufid;
227 : int trycounter;
228 : BufferDesc *bufHdr;
229 :
230 46162 : ResourceOwnerEnlarge(CurrentResourceOwner);
231 :
232 : /*
233 : * Need to get a new buffer. We use a clock-sweep algorithm (essentially
234 : * the same as what freelist.c does now...)
235 : */
236 46162 : trycounter = NLocBuffer;
237 : for (;;)
238 : {
239 206236 : victim_bufid = nextFreeLocalBufId;
240 :
241 206236 : if (++nextFreeLocalBufId >= NLocBuffer)
242 1734 : nextFreeLocalBufId = 0;
243 :
244 206236 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
245 :
246 206236 : if (LocalRefCount[victim_bufid] == 0)
247 : {
248 84904 : uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
249 :
250 84904 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
251 : {
252 38754 : buf_state -= BUF_USAGECOUNT_ONE;
253 38754 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
254 38754 : trycounter = NLocBuffer;
255 : }
256 46150 : else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
257 : {
258 : /*
259 : * This can be reached if the backend initiated AIO for this
260 : * buffer and then errored out.
261 : */
262 : }
263 : else
264 : {
265 : /* Found a usable buffer */
266 46150 : PinLocalBuffer(bufHdr, false);
267 46150 : break;
268 : }
269 : }
270 121332 : else if (--trycounter == 0)
271 12 : ereport(ERROR,
272 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
273 : errmsg("no empty local buffer available")));
274 : }
275 :
276 : /*
277 : * lazy memory allocation: allocate space on first use of a buffer.
278 : */
279 46150 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
280 : {
281 : /* Set pointer for use by BufferGetBlock() macro */
282 31342 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
283 : }
284 :
285 : /*
286 : * this buffer is not referenced but it might still be dirty. if that's
287 : * the case, write it out before reusing it!
288 : */
289 46150 : if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
290 6624 : FlushLocalBuffer(bufHdr, NULL);
291 :
292 : /*
293 : * Remove the victim buffer from the hashtable and mark as invalid.
294 : */
295 46150 : if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
296 : {
297 12818 : InvalidateLocalBuffer(bufHdr, false);
298 :
299 12818 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
300 : }
301 :
302 46150 : return BufferDescriptorGetBuffer(bufHdr);
303 : }
304 :
305 : /* see GetPinLimit() */
306 : uint32
307 13762 : GetLocalPinLimit(void)
308 : {
309 : /* Every backend has its own temporary buffers, and can pin them all. */
310 13762 : return num_temp_buffers;
311 : }
312 :
313 : /* see GetAdditionalPinLimit() */
314 : uint32
315 47888 : GetAdditionalLocalPinLimit(void)
316 : {
317 : Assert(NLocalPinnedBuffers <= num_temp_buffers);
318 47888 : return num_temp_buffers - NLocalPinnedBuffers;
319 : }
320 :
321 : /* see LimitAdditionalPins() */
322 : void
323 22812 : LimitAdditionalLocalPins(uint32 *additional_pins)
324 : {
325 : uint32 max_pins;
326 :
327 22812 : if (*additional_pins <= 1)
328 22158 : return;
329 :
330 : /*
331 : * In contrast to LimitAdditionalPins() other backends don't play a role
332 : * here. We can allow up to NLocBuffer pins in total, but it might not be
333 : * initialized yet so read num_temp_buffers.
334 : */
335 654 : max_pins = (num_temp_buffers - NLocalPinnedBuffers);
336 :
337 654 : if (*additional_pins >= max_pins)
338 0 : *additional_pins = max_pins;
339 : }
340 :
341 : /*
342 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
343 : * temporary buffers.
344 : */
345 : BlockNumber
346 22812 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
347 : ForkNumber fork,
348 : uint32 flags,
349 : uint32 extend_by,
350 : BlockNumber extend_upto,
351 : Buffer *buffers,
352 : uint32 *extended_by)
353 : {
354 : BlockNumber first_block;
355 : instr_time io_start;
356 :
357 : /* Initialize local buffers if first request in this session */
358 22812 : if (LocalBufHash == NULL)
359 504 : InitLocalBuffers();
360 :
361 22812 : LimitAdditionalLocalPins(&extend_by);
362 :
363 52204 : for (uint32 i = 0; i < extend_by; i++)
364 : {
365 : BufferDesc *buf_hdr;
366 : Block buf_block;
367 :
368 29392 : buffers[i] = GetLocalVictimBuffer();
369 29392 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
370 29392 : buf_block = LocalBufHdrGetBlock(buf_hdr);
371 :
372 : /* new buffers are zero-filled */
373 29392 : MemSet(buf_block, 0, BLCKSZ);
374 : }
375 :
376 22812 : first_block = smgrnblocks(BMR_GET_SMGR(bmr), fork);
377 :
378 : if (extend_upto != InvalidBlockNumber)
379 : {
380 : /*
381 : * In contrast to shared relations, nothing could change the relation
382 : * size concurrently. Thus we shouldn't end up finding that we don't
383 : * need to do anything.
384 : */
385 : Assert(first_block <= extend_upto);
386 :
387 : Assert((uint64) first_block + extend_by <= extend_upto);
388 : }
389 :
390 : /* Fail if relation is already at maximum possible length */
391 22812 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
392 0 : ereport(ERROR,
393 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
394 : errmsg("cannot extend relation %s beyond %u blocks",
395 : relpath(BMR_GET_SMGR(bmr)->smgr_rlocator, fork).str,
396 : MaxBlockNumber)));
397 :
398 52204 : for (uint32 i = 0; i < extend_by; i++)
399 : {
400 : int victim_buf_id;
401 : BufferDesc *victim_buf_hdr;
402 : BufferTag tag;
403 : LocalBufferLookupEnt *hresult;
404 : bool found;
405 :
406 29392 : victim_buf_id = -buffers[i] - 1;
407 29392 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
408 :
409 : /* in case we need to pin an existing buffer below */
410 29392 : ResourceOwnerEnlarge(CurrentResourceOwner);
411 :
412 29392 : InitBufferTag(&tag, &BMR_GET_SMGR(bmr)->smgr_rlocator.locator, fork,
413 : first_block + i);
414 :
415 : hresult = (LocalBufferLookupEnt *)
416 29392 : hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
417 29392 : if (found)
418 : {
419 : BufferDesc *existing_hdr;
420 : uint32 buf_state;
421 :
422 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
423 :
424 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
425 0 : PinLocalBuffer(existing_hdr, false);
426 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
427 :
428 : /*
429 : * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
430 : */
431 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
432 : Assert(buf_state & BM_TAG_VALID);
433 : Assert(!(buf_state & BM_DIRTY));
434 0 : buf_state &= ~BM_VALID;
435 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
436 :
437 : /* no need to loop for local buffers */
438 0 : StartLocalBufferIO(existing_hdr, true, false);
439 : }
440 : else
441 : {
442 29392 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
443 :
444 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
445 :
446 29392 : victim_buf_hdr->tag = tag;
447 :
448 29392 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
449 :
450 29392 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
451 :
452 29392 : hresult->id = victim_buf_id;
453 :
454 29392 : StartLocalBufferIO(victim_buf_hdr, true, false);
455 : }
456 : }
457 :
458 22812 : io_start = pgstat_prepare_io_time(track_io_timing);
459 :
460 : /* actually extend relation */
461 22812 : smgrzeroextend(BMR_GET_SMGR(bmr), fork, first_block, extend_by, false);
462 :
463 22812 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
464 22812 : io_start, 1, extend_by * BLCKSZ);
465 :
466 52204 : for (uint32 i = 0; i < extend_by; i++)
467 : {
468 29392 : Buffer buf = buffers[i];
469 : BufferDesc *buf_hdr;
470 : uint32 buf_state;
471 :
472 29392 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
473 :
474 29392 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
475 29392 : buf_state |= BM_VALID;
476 29392 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
477 : }
478 :
479 22812 : *extended_by = extend_by;
480 :
481 22812 : pgBufferUsage.local_blks_written += extend_by;
482 :
483 22812 : return first_block;
484 : }
485 :
486 : /*
487 : * MarkLocalBufferDirty -
488 : * mark a local buffer dirty
489 : */
490 : void
491 3714804 : MarkLocalBufferDirty(Buffer buffer)
492 : {
493 : int bufid;
494 : BufferDesc *bufHdr;
495 : uint32 buf_state;
496 :
497 : Assert(BufferIsLocal(buffer));
498 :
499 : #ifdef LBDEBUG
500 : fprintf(stderr, "LB DIRTY %d\n", buffer);
501 : #endif
502 :
503 3714804 : bufid = -buffer - 1;
504 :
505 : Assert(LocalRefCount[bufid] > 0);
506 :
507 3714804 : bufHdr = GetLocalBufferDescriptor(bufid);
508 :
509 3714804 : buf_state = pg_atomic_read_u32(&bufHdr->state);
510 :
511 3714804 : if (!(buf_state & BM_DIRTY))
512 29726 : pgBufferUsage.local_blks_dirtied++;
513 :
514 3714804 : buf_state |= BM_DIRTY;
515 :
516 3714804 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
517 3714804 : }
518 :
519 : /*
520 : * Like StartBufferIO, but for local buffers
521 : */
522 : bool
523 53542 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
524 : {
525 : uint32 buf_state;
526 :
527 : /*
528 : * With AIO the buffer could have IO in progress, e.g. when there are two
529 : * scans of the same relation. Either wait for the other IO or return
530 : * false.
531 : */
532 53542 : if (pgaio_wref_valid(&bufHdr->io_wref))
533 : {
534 0 : PgAioWaitRef iow = bufHdr->io_wref;
535 :
536 0 : if (nowait)
537 0 : return false;
538 :
539 0 : pgaio_wref_wait(&iow);
540 : }
541 :
542 : /* Once we get here, there is definitely no I/O active on this buffer */
543 :
544 : /* Check if someone else already did the I/O */
545 53542 : buf_state = pg_atomic_read_u32(&bufHdr->state);
546 53542 : if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
547 : {
548 4 : return false;
549 : }
550 :
551 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
552 :
553 : /* local buffers don't track IO using resowners */
554 :
555 53538 : return true;
556 : }
557 :
558 : /*
559 : * Like TerminateBufferIO, but for local buffers
560 : */
561 : void
562 24142 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
563 : bool release_aio)
564 : {
565 : /* Only need to adjust flags */
566 24142 : uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
567 :
568 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
569 :
570 : /* Clear earlier errors, if this IO failed, it'll be marked again */
571 24142 : buf_state &= ~BM_IO_ERROR;
572 :
573 24142 : if (clear_dirty)
574 7268 : buf_state &= ~BM_DIRTY;
575 :
576 24142 : if (release_aio)
577 : {
578 : /* release pin held by IO subsystem, see also buffer_stage_common() */
579 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
580 16818 : buf_state -= BUF_REFCOUNT_ONE;
581 16818 : pgaio_wref_clear(&bufHdr->io_wref);
582 : }
583 :
584 24142 : buf_state |= set_flag_bits;
585 24142 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
586 :
587 : /* local buffers don't track IO using resowners */
588 :
589 : /* local buffers don't use the IO CV, as no other process can see buffer */
590 :
591 : /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
592 24142 : }
593 :
594 : /*
595 : * InvalidateLocalBuffer -- mark a local buffer invalid.
596 : *
597 : * If check_unreferenced is true, error out if the buffer is still
598 : * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
599 : * as part of changing the identity of a buffer, instead of just dropping the
600 : * buffer.
601 : *
602 : * See also InvalidateBuffer().
603 : */
604 : void
605 46150 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
606 : {
607 46150 : Buffer buffer = BufferDescriptorGetBuffer(bufHdr);
608 46150 : int bufid = -buffer - 1;
609 : uint32 buf_state;
610 : LocalBufferLookupEnt *hresult;
611 :
612 : /*
613 : * It's possible that we started IO on this buffer before e.g. aborting
614 : * the transaction that created a table. We need to wait for that IO to
615 : * complete before removing / reusing the buffer.
616 : */
617 46150 : if (pgaio_wref_valid(&bufHdr->io_wref))
618 : {
619 0 : PgAioWaitRef iow = bufHdr->io_wref;
620 :
621 0 : pgaio_wref_wait(&iow);
622 : Assert(!pgaio_wref_valid(&bufHdr->io_wref));
623 : }
624 :
625 46150 : buf_state = pg_atomic_read_u32(&bufHdr->state);
626 :
627 : /*
628 : * We need to test not just LocalRefCount[bufid] but also the BufferDesc
629 : * itself, as the latter is used to represent a pin by the AIO subsystem.
630 : * This can happen if AIO is initiated and then the query errors out.
631 : */
632 46150 : if (check_unreferenced &&
633 33332 : (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
634 0 : elog(ERROR, "block %u of %s is still referenced (local %d)",
635 : bufHdr->tag.blockNum,
636 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
637 : MyProcNumber,
638 : BufTagGetForkNum(&bufHdr->tag)).str,
639 : LocalRefCount[bufid]);
640 :
641 : /* Remove entry from hashtable */
642 : hresult = (LocalBufferLookupEnt *)
643 46150 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
644 46150 : if (!hresult) /* shouldn't happen */
645 0 : elog(ERROR, "local buffer hash table corrupted");
646 : /* Mark buffer invalid */
647 46150 : ClearBufferTag(&bufHdr->tag);
648 46150 : buf_state &= ~BUF_FLAG_MASK;
649 46150 : buf_state &= ~BUF_USAGECOUNT_MASK;
650 46150 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
651 46150 : }
652 :
653 : /*
654 : * DropRelationLocalBuffers
655 : * This function removes from the buffer pool all the pages of the
656 : * specified relation that have block numbers >= firstDelBlock.
657 : * (In particular, with firstDelBlock = 0, all pages are removed.)
658 : * Dirty pages are simply dropped, without bothering to write them
659 : * out first. Therefore, this is NOT rollback-able, and so should be
660 : * used only with extreme caution!
661 : *
662 : * See DropRelationBuffers in bufmgr.c for more notes.
663 : */
664 : void
665 748 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
666 : int nforks, BlockNumber *firstDelBlock)
667 : {
668 : int i;
669 : int j;
670 :
671 617196 : for (i = 0; i < NLocBuffer; i++)
672 : {
673 616448 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
674 : uint32 buf_state;
675 :
676 616448 : buf_state = pg_atomic_read_u32(&bufHdr->state);
677 :
678 616448 : if (!(buf_state & BM_TAG_VALID) ||
679 56606 : !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
680 614776 : continue;
681 :
682 1912 : for (j = 0; j < nforks; j++)
683 : {
684 1842 : if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
685 1666 : bufHdr->tag.blockNum >= firstDelBlock[j])
686 : {
687 1602 : InvalidateLocalBuffer(bufHdr, true);
688 1602 : break;
689 : }
690 : }
691 : }
692 748 : }
693 :
694 : /*
695 : * DropRelationAllLocalBuffers
696 : * This function removes from the buffer pool all pages of all forks
697 : * of the specified relation.
698 : *
699 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
700 : */
701 : void
702 6348 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
703 : {
704 : int i;
705 :
706 6031676 : for (i = 0; i < NLocBuffer; i++)
707 : {
708 6025328 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
709 : uint32 buf_state;
710 :
711 6025328 : buf_state = pg_atomic_read_u32(&bufHdr->state);
712 :
713 6481398 : if ((buf_state & BM_TAG_VALID) &&
714 456070 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
715 : {
716 31630 : InvalidateLocalBuffer(bufHdr, true);
717 : }
718 : }
719 6348 : }
720 :
721 : /*
722 : * InitLocalBuffers -
723 : * init the local buffer cache. Since most queries (esp. multi-user ones)
724 : * don't involve local buffers, we delay allocating actual memory for the
725 : * buffers until we need them; just make the buffer headers here.
726 : */
727 : static void
728 530 : InitLocalBuffers(void)
729 : {
730 530 : int nbufs = num_temp_buffers;
731 : HASHCTL info;
732 : int i;
733 :
734 : /*
735 : * Parallel workers can't access data in temporary tables, because they
736 : * have no visibility into the local buffers of their leader. This is a
737 : * convenient, low-cost place to provide a backstop check for that. Note
738 : * that we don't wish to prevent a parallel worker from accessing catalog
739 : * metadata about a temp table, so checks at higher levels would be
740 : * inappropriate.
741 : */
742 530 : if (IsParallelWorker())
743 0 : ereport(ERROR,
744 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
745 : errmsg("cannot access temporary tables during a parallel operation")));
746 :
747 : /* Allocate and zero buffer headers and auxiliary arrays */
748 530 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
749 530 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
750 530 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
751 530 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
752 0 : ereport(FATAL,
753 : (errcode(ERRCODE_OUT_OF_MEMORY),
754 : errmsg("out of memory")));
755 :
756 530 : nextFreeLocalBufId = 0;
757 :
758 : /* initialize fields that need to start off nonzero */
759 513682 : for (i = 0; i < nbufs; i++)
760 : {
761 513152 : BufferDesc *buf = GetLocalBufferDescriptor(i);
762 :
763 : /*
764 : * negative to indicate local buffer. This is tricky: shared buffers
765 : * start with 0. We have to start with -2. (Note that the routine
766 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
767 : * is -1.)
768 : */
769 513152 : buf->buf_id = -i - 2;
770 :
771 513152 : pgaio_wref_clear(&buf->io_wref);
772 :
773 : /*
774 : * Intentionally do not initialize the buffer's atomic variable
775 : * (besides zeroing the underlying memory above). That way we get
776 : * errors on platforms without atomics, if somebody (re-)introduces
777 : * atomic operations for local buffers.
778 : */
779 : }
780 :
781 : /* Create the lookup hash table */
782 530 : info.keysize = sizeof(BufferTag);
783 530 : info.entrysize = sizeof(LocalBufferLookupEnt);
784 :
785 530 : LocalBufHash = hash_create("Local Buffer Lookup Table",
786 : nbufs,
787 : &info,
788 : HASH_ELEM | HASH_BLOBS);
789 :
790 530 : if (!LocalBufHash)
791 0 : elog(ERROR, "could not initialize local buffer hash table");
792 :
793 : /* Initialization done, mark buffers allocated */
794 530 : NLocBuffer = nbufs;
795 530 : }
796 :
797 : /*
798 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
799 : * that does not support adjusting the usagecount - but so far it does not
800 : * seem worth the trouble.
801 : *
802 : * Note that ResourceOwnerEnlarge() must have been done already.
803 : */
804 : bool
805 2584450 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
806 : {
807 : uint32 buf_state;
808 2584450 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
809 2584450 : int bufid = -buffer - 1;
810 :
811 2584450 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
812 :
813 2584450 : if (LocalRefCount[bufid] == 0)
814 : {
815 2408380 : NLocalPinnedBuffers++;
816 2408380 : buf_state += BUF_REFCOUNT_ONE;
817 2408380 : if (adjust_usagecount &&
818 2361630 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
819 : {
820 129102 : buf_state += BUF_USAGECOUNT_ONE;
821 : }
822 2408380 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
823 :
824 : /*
825 : * See comment in PinBuffer().
826 : *
827 : * If the buffer isn't allocated yet, it'll be marked as defined in
828 : * GetLocalBufferStorage().
829 : */
830 2408380 : if (LocalBufHdrGetBlock(buf_hdr) != NULL)
831 : VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
832 : }
833 2584450 : LocalRefCount[bufid]++;
834 2584450 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
835 : BufferDescriptorGetBuffer(buf_hdr));
836 :
837 2584450 : return buf_state & BM_VALID;
838 : }
839 :
840 : void
841 3287756 : UnpinLocalBuffer(Buffer buffer)
842 : {
843 3287756 : UnpinLocalBufferNoOwner(buffer);
844 3287756 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
845 3287756 : }
846 :
847 : void
848 3293822 : UnpinLocalBufferNoOwner(Buffer buffer)
849 : {
850 3293822 : int buffid = -buffer - 1;
851 :
852 : Assert(BufferIsLocal(buffer));
853 : Assert(LocalRefCount[buffid] > 0);
854 : Assert(NLocalPinnedBuffers > 0);
855 :
856 3293822 : if (--LocalRefCount[buffid] == 0)
857 : {
858 2408380 : BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
859 : uint32 buf_state;
860 :
861 2408380 : NLocalPinnedBuffers--;
862 :
863 2408380 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
864 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
865 2408380 : buf_state -= BUF_REFCOUNT_ONE;
866 2408380 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
867 :
868 : /* see comment in UnpinBufferNoOwner */
869 : VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
870 : }
871 3293822 : }
872 :
873 : /*
874 : * GUC check_hook for temp_buffers
875 : */
876 : bool
877 2292 : check_temp_buffers(int *newval, void **extra, GucSource source)
878 : {
879 : /*
880 : * Once local buffers have been initialized, it's too late to change this.
881 : * However, if this is only a test call, allow it.
882 : */
883 2292 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
884 : {
885 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
886 0 : return false;
887 : }
888 2292 : return true;
889 : }
890 :
891 : /*
892 : * GetLocalBufferStorage - allocate memory for a local buffer
893 : *
894 : * The idea of this function is to aggregate our requests for storage
895 : * so that the memory manager doesn't see a whole lot of relatively small
896 : * requests. Since we'll never give back a local buffer once it's created
897 : * within a particular process, no point in burdening memmgr with separately
898 : * managed chunks.
899 : */
900 : static Block
901 31342 : GetLocalBufferStorage(void)
902 : {
903 : static char *cur_block = NULL;
904 : static int next_buf_in_block = 0;
905 : static int num_bufs_in_block = 0;
906 : static int total_bufs_allocated = 0;
907 : static MemoryContext LocalBufferContext = NULL;
908 :
909 : char *this_buf;
910 :
911 : Assert(total_bufs_allocated < NLocBuffer);
912 :
913 31342 : if (next_buf_in_block >= num_bufs_in_block)
914 : {
915 : /* Need to make a new request to memmgr */
916 : int num_bufs;
917 :
918 : /*
919 : * We allocate local buffers in a context of their own, so that the
920 : * space eaten for them is easily recognizable in MemoryContextStats
921 : * output. Create the context on first use.
922 : */
923 854 : if (LocalBufferContext == NULL)
924 530 : LocalBufferContext =
925 530 : AllocSetContextCreate(TopMemoryContext,
926 : "LocalBufferContext",
927 : ALLOCSET_DEFAULT_SIZES);
928 :
929 : /* Start with a 16-buffer request; subsequent ones double each time */
930 854 : num_bufs = Max(num_bufs_in_block * 2, 16);
931 : /* But not more than what we need for all remaining local bufs */
932 854 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
933 : /* And don't overflow MaxAllocSize, either */
934 854 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
935 :
936 : /* Buffers should be I/O aligned. */
937 1708 : cur_block = MemoryContextAllocAligned(LocalBufferContext,
938 854 : num_bufs * BLCKSZ,
939 : PG_IO_ALIGN_SIZE,
940 : 0);
941 :
942 854 : next_buf_in_block = 0;
943 854 : num_bufs_in_block = num_bufs;
944 : }
945 :
946 : /* Allocate next buffer in current memory block */
947 31342 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
948 31342 : next_buf_in_block++;
949 31342 : total_bufs_allocated++;
950 :
951 : /*
952 : * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
953 : * here. The block is actually undefined, but we want consistency with
954 : * the regular case of not needing to allocate memory. This is
955 : * specifically needed when method_io_uring.c fills the block, because
956 : * Valgrind doesn't recognize io_uring reads causing undefined memory to
957 : * become defined.
958 : */
959 : VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
960 :
961 31342 : return (Block) this_buf;
962 : }
963 :
964 : /*
965 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
966 : *
967 : * This is just like CheckForBufferLeaks(), but for local buffers.
968 : */
969 : static void
970 1140786 : CheckForLocalBufferLeaks(void)
971 : {
972 : #ifdef USE_ASSERT_CHECKING
973 : if (LocalRefCount)
974 : {
975 : int RefCountErrors = 0;
976 : int i;
977 :
978 : for (i = 0; i < NLocBuffer; i++)
979 : {
980 : if (LocalRefCount[i] != 0)
981 : {
982 : Buffer b = -i - 1;
983 : char *s;
984 :
985 : s = DebugPrintBufferRefcount(b);
986 : elog(WARNING, "local buffer refcount leak: %s", s);
987 : pfree(s);
988 :
989 : RefCountErrors++;
990 : }
991 : }
992 : Assert(RefCountErrors == 0);
993 : }
994 : #endif
995 1140786 : }
996 :
997 : /*
998 : * AtEOXact_LocalBuffers - clean up at end of transaction.
999 : *
1000 : * This is just like AtEOXact_Buffers, but for local buffers.
1001 : */
1002 : void
1003 1096068 : AtEOXact_LocalBuffers(bool isCommit)
1004 : {
1005 1096068 : CheckForLocalBufferLeaks();
1006 1096068 : }
1007 :
1008 : /*
1009 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
1010 : *
1011 : * This is just like AtProcExit_Buffers, but for local buffers.
1012 : */
1013 : void
1014 44718 : AtProcExit_LocalBuffers(void)
1015 : {
1016 : /*
1017 : * We shouldn't be holding any remaining pins; if we are, and assertions
1018 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
1019 : * drop the temp rels.
1020 : */
1021 44718 : CheckForLocalBufferLeaks();
1022 44718 : }
|