Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "executor/instrument.h"
20 : #include "pgstat.h"
21 : #include "storage/aio.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "storage/fd.h"
25 : #include "utils/guc_hooks.h"
26 : #include "utils/memdebug.h"
27 : #include "utils/memutils.h"
28 : #include "utils/resowner.h"
29 :
30 :
31 : /*#define LBDEBUG*/
32 :
33 : /* entry for buffer lookup hashtable */
34 : typedef struct
35 : {
36 : BufferTag key; /* Tag of a disk page */
37 : int id; /* Associated local buffer's index */
38 : } LocalBufferLookupEnt;
39 :
40 : /* Note: this macro only works on local buffers, not shared ones! */
41 : #define LocalBufHdrGetBlock(bufHdr) \
42 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
43 :
44 : int NLocBuffer = 0; /* until buffers are initialized */
45 :
46 : BufferDesc *LocalBufferDescriptors = NULL;
47 : Block *LocalBufferBlockPointers = NULL;
48 : int32 *LocalRefCount = NULL;
49 :
50 : static int nextFreeLocalBufId = 0;
51 :
52 : static HTAB *LocalBufHash = NULL;
53 :
54 : /* number of local buffers pinned at least once */
55 : static int NLocalPinnedBuffers = 0;
56 :
57 :
58 : static void InitLocalBuffers(void);
59 : static Block GetLocalBufferStorage(void);
60 : static Buffer GetLocalVictimBuffer(void);
61 :
62 :
63 : /*
64 : * PrefetchLocalBuffer -
65 : * initiate asynchronous read of a block of a relation
66 : *
67 : * Do PrefetchBuffer's work for temporary relations.
68 : * No-op if prefetching isn't compiled in.
69 : */
70 : PrefetchBufferResult
71 1566 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
72 : BlockNumber blockNum)
73 : {
74 1566 : PrefetchBufferResult result = {InvalidBuffer, false};
75 : BufferTag newTag; /* identity of requested block */
76 : LocalBufferLookupEnt *hresult;
77 :
78 1566 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
79 :
80 : /* Initialize local buffers if first request in this session */
81 1566 : if (LocalBufHash == NULL)
82 0 : InitLocalBuffers();
83 :
84 : /* See if the desired buffer already exists */
85 : hresult = (LocalBufferLookupEnt *)
86 1566 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
87 :
88 1566 : if (hresult)
89 : {
90 : /* Yes, so nothing to do */
91 1566 : result.recent_buffer = -hresult->id - 1;
92 : }
93 : else
94 : {
95 : #ifdef USE_PREFETCH
96 : /* Not in buffers, so initiate prefetch */
97 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
98 0 : smgrprefetch(smgr, forkNum, blockNum, 1))
99 : {
100 0 : result.initiated_io = true;
101 : }
102 : #endif /* USE_PREFETCH */
103 : }
104 :
105 1566 : return result;
106 : }
107 :
108 :
109 : /*
110 : * LocalBufferAlloc -
111 : * Find or create a local buffer for the given page of the given relation.
112 : *
113 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
114 : * any locking since this is all local. We support only default access
115 : * strategy (hence, usage_count is always advanced).
116 : */
117 : BufferDesc *
118 2542228 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
119 : bool *foundPtr)
120 : {
121 : BufferTag newTag; /* identity of requested block */
122 : LocalBufferLookupEnt *hresult;
123 : BufferDesc *bufHdr;
124 : Buffer victim_buffer;
125 : int bufid;
126 : bool found;
127 :
128 2542228 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
129 :
130 : /* Initialize local buffers if first request in this session */
131 2542228 : if (LocalBufHash == NULL)
132 26 : InitLocalBuffers();
133 :
134 2542228 : ResourceOwnerEnlarge(CurrentResourceOwner);
135 :
136 : /* See if the desired buffer already exists */
137 : hresult = (LocalBufferLookupEnt *)
138 2542228 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
139 :
140 2542228 : if (hresult)
141 : {
142 2525488 : bufid = hresult->id;
143 2525488 : bufHdr = GetLocalBufferDescriptor(bufid);
144 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
145 :
146 2525488 : *foundPtr = PinLocalBuffer(bufHdr, true);
147 : }
148 : else
149 : {
150 : uint32 buf_state;
151 :
152 16740 : victim_buffer = GetLocalVictimBuffer();
153 16728 : bufid = -victim_buffer - 1;
154 16728 : bufHdr = GetLocalBufferDescriptor(bufid);
155 :
156 : hresult = (LocalBufferLookupEnt *)
157 16728 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
158 16728 : if (found) /* shouldn't happen */
159 0 : elog(ERROR, "local buffer hash table corrupted");
160 16728 : hresult->id = bufid;
161 :
162 : /*
163 : * it's all ours now.
164 : */
165 16728 : bufHdr->tag = newTag;
166 :
167 16728 : buf_state = pg_atomic_read_u32(&bufHdr->state);
168 16728 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
169 16728 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
170 16728 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
171 :
172 16728 : *foundPtr = false;
173 : }
174 :
175 2542216 : return bufHdr;
176 : }
177 :
178 : /*
179 : * Like FlushBuffer(), just for local buffers.
180 : */
181 : void
182 7268 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
183 : {
184 : instr_time io_start;
185 7268 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
186 :
187 : Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
188 :
189 : /*
190 : * Try to start an I/O operation. There currently are no reasons for
191 : * StartLocalBufferIO to return false, so we raise an error in that case.
192 : */
193 7268 : if (!StartLocalBufferIO(bufHdr, false, false))
194 0 : elog(ERROR, "failed to start write IO on local buffer");
195 :
196 : /* Find smgr relation for buffer */
197 7268 : if (reln == NULL)
198 6668 : reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
199 : MyProcNumber);
200 :
201 7268 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
202 :
203 7268 : io_start = pgstat_prepare_io_time(track_io_timing);
204 :
205 : /* And write... */
206 7268 : smgrwrite(reln,
207 7268 : BufTagGetForkNum(&bufHdr->tag),
208 : bufHdr->tag.blockNum,
209 : localpage,
210 : false);
211 :
212 : /* Temporary table I/O does not use Buffer Access Strategies */
213 7268 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
214 : IOOP_WRITE, io_start, 1, BLCKSZ);
215 :
216 : /* Mark not-dirty */
217 7268 : TerminateLocalBufferIO(bufHdr, true, 0, false);
218 :
219 7268 : pgBufferUsage.local_blks_written++;
220 7268 : }
221 :
222 : static Buffer
223 46026 : GetLocalVictimBuffer(void)
224 : {
225 : int victim_bufid;
226 : int trycounter;
227 : BufferDesc *bufHdr;
228 :
229 46026 : ResourceOwnerEnlarge(CurrentResourceOwner);
230 :
231 : /*
232 : * Need to get a new buffer. We use a clock sweep algorithm (essentially
233 : * the same as what freelist.c does now...)
234 : */
235 46026 : trycounter = NLocBuffer;
236 : for (;;)
237 : {
238 206100 : victim_bufid = nextFreeLocalBufId;
239 :
240 206100 : if (++nextFreeLocalBufId >= NLocBuffer)
241 1734 : nextFreeLocalBufId = 0;
242 :
243 206100 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
244 :
245 206100 : if (LocalRefCount[victim_bufid] == 0)
246 : {
247 84768 : uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
248 :
249 84768 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
250 : {
251 38754 : buf_state -= BUF_USAGECOUNT_ONE;
252 38754 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
253 38754 : trycounter = NLocBuffer;
254 : }
255 46014 : else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
256 : {
257 : /*
258 : * This can be reached if the backend initiated AIO for this
259 : * buffer and then errored out.
260 : */
261 : }
262 : else
263 : {
264 : /* Found a usable buffer */
265 46014 : PinLocalBuffer(bufHdr, false);
266 46014 : break;
267 : }
268 : }
269 121332 : else if (--trycounter == 0)
270 12 : ereport(ERROR,
271 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
272 : errmsg("no empty local buffer available")));
273 : }
274 :
275 : /*
276 : * lazy memory allocation: allocate space on first use of a buffer.
277 : */
278 46014 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
279 : {
280 : /* Set pointer for use by BufferGetBlock() macro */
281 31206 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
282 : }
283 :
284 : /*
285 : * this buffer is not referenced but it might still be dirty. if that's
286 : * the case, write it out before reusing it!
287 : */
288 46014 : if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
289 6624 : FlushLocalBuffer(bufHdr, NULL);
290 :
291 : /*
292 : * Remove the victim buffer from the hashtable and mark as invalid.
293 : */
294 46014 : if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
295 : {
296 12818 : InvalidateLocalBuffer(bufHdr, false);
297 :
298 12818 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
299 : }
300 :
301 46014 : return BufferDescriptorGetBuffer(bufHdr);
302 : }
303 :
304 : /* see GetPinLimit() */
305 : uint32
306 13454 : GetLocalPinLimit(void)
307 : {
308 : /* Every backend has its own temporary buffers, and can pin them all. */
309 13454 : return num_temp_buffers;
310 : }
311 :
312 : /* see GetAdditionalPinLimit() */
313 : uint32
314 47336 : GetAdditionalLocalPinLimit(void)
315 : {
316 : Assert(NLocalPinnedBuffers <= num_temp_buffers);
317 47336 : return num_temp_buffers - NLocalPinnedBuffers;
318 : }
319 :
320 : /* see LimitAdditionalPins() */
321 : void
322 22722 : LimitAdditionalLocalPins(uint32 *additional_pins)
323 : {
324 : uint32 max_pins;
325 :
326 22722 : if (*additional_pins <= 1)
327 22076 : return;
328 :
329 : /*
330 : * In contrast to LimitAdditionalPins() other backends don't play a role
331 : * here. We can allow up to NLocBuffer pins in total, but it might not be
332 : * initialized yet so read num_temp_buffers.
333 : */
334 646 : max_pins = (num_temp_buffers - NLocalPinnedBuffers);
335 :
336 646 : if (*additional_pins >= max_pins)
337 0 : *additional_pins = max_pins;
338 : }
339 :
340 : /*
341 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
342 : * temporary buffers.
343 : */
344 : BlockNumber
345 22722 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
346 : ForkNumber fork,
347 : uint32 flags,
348 : uint32 extend_by,
349 : BlockNumber extend_upto,
350 : Buffer *buffers,
351 : uint32 *extended_by)
352 : {
353 : BlockNumber first_block;
354 : instr_time io_start;
355 :
356 : /* Initialize local buffers if first request in this session */
357 22722 : if (LocalBufHash == NULL)
358 502 : InitLocalBuffers();
359 :
360 22722 : LimitAdditionalLocalPins(&extend_by);
361 :
362 52008 : for (uint32 i = 0; i < extend_by; i++)
363 : {
364 : BufferDesc *buf_hdr;
365 : Block buf_block;
366 :
367 29286 : buffers[i] = GetLocalVictimBuffer();
368 29286 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
369 29286 : buf_block = LocalBufHdrGetBlock(buf_hdr);
370 :
371 : /* new buffers are zero-filled */
372 29286 : MemSet(buf_block, 0, BLCKSZ);
373 : }
374 :
375 22722 : first_block = smgrnblocks(bmr.smgr, fork);
376 :
377 : if (extend_upto != InvalidBlockNumber)
378 : {
379 : /*
380 : * In contrast to shared relations, nothing could change the relation
381 : * size concurrently. Thus we shouldn't end up finding that we don't
382 : * need to do anything.
383 : */
384 : Assert(first_block <= extend_upto);
385 :
386 : Assert((uint64) first_block + extend_by <= extend_upto);
387 : }
388 :
389 : /* Fail if relation is already at maximum possible length */
390 22722 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
391 0 : ereport(ERROR,
392 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
393 : errmsg("cannot extend relation %s beyond %u blocks",
394 : relpath(bmr.smgr->smgr_rlocator, fork).str,
395 : MaxBlockNumber)));
396 :
397 52008 : for (uint32 i = 0; i < extend_by; i++)
398 : {
399 : int victim_buf_id;
400 : BufferDesc *victim_buf_hdr;
401 : BufferTag tag;
402 : LocalBufferLookupEnt *hresult;
403 : bool found;
404 :
405 29286 : victim_buf_id = -buffers[i] - 1;
406 29286 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
407 :
408 : /* in case we need to pin an existing buffer below */
409 29286 : ResourceOwnerEnlarge(CurrentResourceOwner);
410 :
411 29286 : InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
412 :
413 : hresult = (LocalBufferLookupEnt *)
414 29286 : hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
415 29286 : if (found)
416 : {
417 : BufferDesc *existing_hdr;
418 : uint32 buf_state;
419 :
420 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
421 :
422 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
423 0 : PinLocalBuffer(existing_hdr, false);
424 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
425 :
426 : /*
427 : * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
428 : */
429 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
430 : Assert(buf_state & BM_TAG_VALID);
431 : Assert(!(buf_state & BM_DIRTY));
432 0 : buf_state &= ~BM_VALID;
433 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
434 :
435 : /* no need to loop for local buffers */
436 0 : StartLocalBufferIO(existing_hdr, true, false);
437 : }
438 : else
439 : {
440 29286 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
441 :
442 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
443 :
444 29286 : victim_buf_hdr->tag = tag;
445 :
446 29286 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
447 :
448 29286 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
449 :
450 29286 : hresult->id = victim_buf_id;
451 :
452 29286 : StartLocalBufferIO(victim_buf_hdr, true, false);
453 : }
454 : }
455 :
456 22722 : io_start = pgstat_prepare_io_time(track_io_timing);
457 :
458 : /* actually extend relation */
459 22722 : smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
460 :
461 22722 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
462 22722 : io_start, 1, extend_by * BLCKSZ);
463 :
464 52008 : for (uint32 i = 0; i < extend_by; i++)
465 : {
466 29286 : Buffer buf = buffers[i];
467 : BufferDesc *buf_hdr;
468 : uint32 buf_state;
469 :
470 29286 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
471 :
472 29286 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
473 29286 : buf_state |= BM_VALID;
474 29286 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
475 : }
476 :
477 22722 : *extended_by = extend_by;
478 :
479 22722 : pgBufferUsage.local_blks_written += extend_by;
480 :
481 22722 : return first_block;
482 : }
483 :
484 : /*
485 : * MarkLocalBufferDirty -
486 : * mark a local buffer dirty
487 : */
488 : void
489 3697628 : MarkLocalBufferDirty(Buffer buffer)
490 : {
491 : int bufid;
492 : BufferDesc *bufHdr;
493 : uint32 buf_state;
494 :
495 : Assert(BufferIsLocal(buffer));
496 :
497 : #ifdef LBDEBUG
498 : fprintf(stderr, "LB DIRTY %d\n", buffer);
499 : #endif
500 :
501 3697628 : bufid = -buffer - 1;
502 :
503 : Assert(LocalRefCount[bufid] > 0);
504 :
505 3697628 : bufHdr = GetLocalBufferDescriptor(bufid);
506 :
507 3697628 : buf_state = pg_atomic_read_u32(&bufHdr->state);
508 :
509 3697628 : if (!(buf_state & BM_DIRTY))
510 29634 : pgBufferUsage.local_blks_dirtied++;
511 :
512 3697628 : buf_state |= BM_DIRTY;
513 :
514 3697628 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
515 3697628 : }
516 :
517 : /*
518 : * Like StartBufferIO, but for local buffers
519 : */
520 : bool
521 53406 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
522 : {
523 : uint32 buf_state;
524 :
525 : /*
526 : * With AIO the buffer could have IO in progress, e.g. when there are two
527 : * scans of the same relation. Either wait for the other IO or return
528 : * false.
529 : */
530 53406 : if (pgaio_wref_valid(&bufHdr->io_wref))
531 : {
532 0 : PgAioWaitRef iow = bufHdr->io_wref;
533 :
534 0 : if (nowait)
535 0 : return false;
536 :
537 0 : pgaio_wref_wait(&iow);
538 : }
539 :
540 : /* Once we get here, there is definitely no I/O active on this buffer */
541 :
542 : /* Check if someone else already did the I/O */
543 53406 : buf_state = pg_atomic_read_u32(&bufHdr->state);
544 53406 : if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
545 : {
546 4 : return false;
547 : }
548 :
549 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
550 :
551 : /* local buffers don't track IO using resowners */
552 :
553 53402 : return true;
554 : }
555 :
556 : /*
557 : * Like TerminateBufferIO, but for local buffers
558 : */
559 : void
560 24112 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
561 : bool release_aio)
562 : {
563 : /* Only need to adjust flags */
564 24112 : uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
565 :
566 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
567 :
568 : /* Clear earlier errors, if this IO failed, it'll be marked again */
569 24112 : buf_state &= ~BM_IO_ERROR;
570 :
571 24112 : if (clear_dirty)
572 7268 : buf_state &= ~BM_DIRTY;
573 :
574 24112 : if (release_aio)
575 : {
576 : /* release pin held by IO subsystem, see also buffer_stage_common() */
577 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
578 16788 : buf_state -= BUF_REFCOUNT_ONE;
579 16788 : pgaio_wref_clear(&bufHdr->io_wref);
580 : }
581 :
582 24112 : buf_state |= set_flag_bits;
583 24112 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
584 :
585 : /* local buffers don't track IO using resowners */
586 :
587 : /* local buffers don't use the IO CV, as no other process can see buffer */
588 :
589 : /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
590 24112 : }
591 :
592 : /*
593 : * InvalidateLocalBuffer -- mark a local buffer invalid.
594 : *
595 : * If check_unreferenced is true, error out if the buffer is still
596 : * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
597 : * as part of changing the identity of a buffer, instead of just dropping the
598 : * buffer.
599 : *
600 : * See also InvalidateBuffer().
601 : */
602 : void
603 46014 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
604 : {
605 46014 : Buffer buffer = BufferDescriptorGetBuffer(bufHdr);
606 46014 : int bufid = -buffer - 1;
607 : uint32 buf_state;
608 : LocalBufferLookupEnt *hresult;
609 :
610 : /*
611 : * It's possible that we started IO on this buffer before e.g. aborting
612 : * the transaction that created a table. We need to wait for that IO to
613 : * complete before removing / reusing the buffer.
614 : */
615 46014 : if (pgaio_wref_valid(&bufHdr->io_wref))
616 : {
617 0 : PgAioWaitRef iow = bufHdr->io_wref;
618 :
619 0 : pgaio_wref_wait(&iow);
620 : Assert(!pgaio_wref_valid(&bufHdr->io_wref));
621 : }
622 :
623 46014 : buf_state = pg_atomic_read_u32(&bufHdr->state);
624 :
625 : /*
626 : * We need to test not just LocalRefCount[bufid] but also the BufferDesc
627 : * itself, as the latter is used to represent a pin by the AIO subsystem.
628 : * This can happen if AIO is initiated and then the query errors out.
629 : */
630 46014 : if (check_unreferenced &&
631 33196 : (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
632 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
633 : bufHdr->tag.blockNum,
634 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
635 : MyProcNumber,
636 : BufTagGetForkNum(&bufHdr->tag)).str,
637 : LocalRefCount[bufid]);
638 :
639 : /* Remove entry from hashtable */
640 : hresult = (LocalBufferLookupEnt *)
641 46014 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
642 46014 : if (!hresult) /* shouldn't happen */
643 0 : elog(ERROR, "local buffer hash table corrupted");
644 : /* Mark buffer invalid */
645 46014 : ClearBufferTag(&bufHdr->tag);
646 46014 : buf_state &= ~BUF_FLAG_MASK;
647 46014 : buf_state &= ~BUF_USAGECOUNT_MASK;
648 46014 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
649 46014 : }
650 :
651 : /*
652 : * DropRelationLocalBuffers
653 : * This function removes from the buffer pool all the pages of the
654 : * specified relation that have block numbers >= firstDelBlock.
655 : * (In particular, with firstDelBlock = 0, all pages are removed.)
656 : * Dirty pages are simply dropped, without bothering to write them
657 : * out first. Therefore, this is NOT rollback-able, and so should be
658 : * used only with extreme caution!
659 : *
660 : * See DropRelationBuffers in bufmgr.c for more notes.
661 : */
662 : void
663 794 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
664 : BlockNumber firstDelBlock)
665 : {
666 : int i;
667 :
668 664346 : for (i = 0; i < NLocBuffer; i++)
669 : {
670 663552 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
671 : uint32 buf_state;
672 :
673 663552 : buf_state = pg_atomic_read_u32(&bufHdr->state);
674 :
675 723942 : if ((buf_state & BM_TAG_VALID) &&
676 62232 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
677 1842 : BufTagGetForkNum(&bufHdr->tag) == forkNum &&
678 1666 : bufHdr->tag.blockNum >= firstDelBlock)
679 : {
680 1602 : InvalidateLocalBuffer(bufHdr, true);
681 : }
682 : }
683 794 : }
684 :
685 : /*
686 : * DropRelationAllLocalBuffers
687 : * This function removes from the buffer pool all pages of all forks
688 : * of the specified relation.
689 : *
690 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
691 : */
692 : void
693 6194 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
694 : {
695 : int i;
696 :
697 5892258 : for (i = 0; i < NLocBuffer; i++)
698 : {
699 5886064 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
700 : uint32 buf_state;
701 :
702 5886064 : buf_state = pg_atomic_read_u32(&bufHdr->state);
703 :
704 6311184 : if ((buf_state & BM_TAG_VALID) &&
705 425120 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
706 : {
707 31494 : InvalidateLocalBuffer(bufHdr, true);
708 : }
709 : }
710 6194 : }
711 :
712 : /*
713 : * InitLocalBuffers -
714 : * init the local buffer cache. Since most queries (esp. multi-user ones)
715 : * don't involve local buffers, we delay allocating actual memory for the
716 : * buffers until we need them; just make the buffer headers here.
717 : */
718 : static void
719 528 : InitLocalBuffers(void)
720 : {
721 528 : int nbufs = num_temp_buffers;
722 : HASHCTL info;
723 : int i;
724 :
725 : /*
726 : * Parallel workers can't access data in temporary tables, because they
727 : * have no visibility into the local buffers of their leader. This is a
728 : * convenient, low-cost place to provide a backstop check for that. Note
729 : * that we don't wish to prevent a parallel worker from accessing catalog
730 : * metadata about a temp table, so checks at higher levels would be
731 : * inappropriate.
732 : */
733 528 : if (IsParallelWorker())
734 0 : ereport(ERROR,
735 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
736 : errmsg("cannot access temporary tables during a parallel operation")));
737 :
738 : /* Allocate and zero buffer headers and auxiliary arrays */
739 528 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
740 528 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
741 528 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
742 528 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
743 0 : ereport(FATAL,
744 : (errcode(ERRCODE_OUT_OF_MEMORY),
745 : errmsg("out of memory")));
746 :
747 528 : nextFreeLocalBufId = 0;
748 :
749 : /* initialize fields that need to start off nonzero */
750 511632 : for (i = 0; i < nbufs; i++)
751 : {
752 511104 : BufferDesc *buf = GetLocalBufferDescriptor(i);
753 :
754 : /*
755 : * negative to indicate local buffer. This is tricky: shared buffers
756 : * start with 0. We have to start with -2. (Note that the routine
757 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
758 : * is -1.)
759 : */
760 511104 : buf->buf_id = -i - 2;
761 :
762 511104 : pgaio_wref_clear(&buf->io_wref);
763 :
764 : /*
765 : * Intentionally do not initialize the buffer's atomic variable
766 : * (besides zeroing the underlying memory above). That way we get
767 : * errors on platforms without atomics, if somebody (re-)introduces
768 : * atomic operations for local buffers.
769 : */
770 : }
771 :
772 : /* Create the lookup hash table */
773 528 : info.keysize = sizeof(BufferTag);
774 528 : info.entrysize = sizeof(LocalBufferLookupEnt);
775 :
776 528 : LocalBufHash = hash_create("Local Buffer Lookup Table",
777 : nbufs,
778 : &info,
779 : HASH_ELEM | HASH_BLOBS);
780 :
781 528 : if (!LocalBufHash)
782 0 : elog(ERROR, "could not initialize local buffer hash table");
783 :
784 : /* Initialization done, mark buffers allocated */
785 528 : NLocBuffer = nbufs;
786 528 : }
787 :
788 : /*
789 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
790 : * that does not support adjusting the usagecount - but so far it does not
791 : * seem worth the trouble.
792 : *
793 : * Note that ResourceOwnerEnlarge() must have been done already.
794 : */
795 : bool
796 2572166 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
797 : {
798 : uint32 buf_state;
799 2572166 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
800 2572166 : int bufid = -buffer - 1;
801 :
802 2572166 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
803 :
804 2572166 : if (LocalRefCount[bufid] == 0)
805 : {
806 2400184 : NLocalPinnedBuffers++;
807 2400184 : buf_state += BUF_REFCOUNT_ONE;
808 2400184 : if (adjust_usagecount &&
809 2353570 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
810 : {
811 128836 : buf_state += BUF_USAGECOUNT_ONE;
812 : }
813 2400184 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
814 :
815 : /*
816 : * See comment in PinBuffer().
817 : *
818 : * If the buffer isn't allocated yet, it'll be marked as defined in
819 : * GetLocalBufferStorage().
820 : */
821 2400184 : if (LocalBufHdrGetBlock(buf_hdr) != NULL)
822 : VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
823 : }
824 2572166 : LocalRefCount[bufid]++;
825 2572166 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
826 : BufferDescriptorGetBuffer(buf_hdr));
827 :
828 2572166 : return buf_state & BM_VALID;
829 : }
830 :
831 : void
832 3275240 : UnpinLocalBuffer(Buffer buffer)
833 : {
834 3275240 : UnpinLocalBufferNoOwner(buffer);
835 3275240 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
836 3275240 : }
837 :
838 : void
839 3281210 : UnpinLocalBufferNoOwner(Buffer buffer)
840 : {
841 3281210 : int buffid = -buffer - 1;
842 :
843 : Assert(BufferIsLocal(buffer));
844 : Assert(LocalRefCount[buffid] > 0);
845 : Assert(NLocalPinnedBuffers > 0);
846 :
847 3281210 : if (--LocalRefCount[buffid] == 0)
848 : {
849 2400184 : BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
850 : uint32 buf_state;
851 :
852 2400184 : NLocalPinnedBuffers--;
853 :
854 2400184 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
855 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
856 2400184 : buf_state -= BUF_REFCOUNT_ONE;
857 2400184 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
858 :
859 : /* see comment in UnpinBufferNoOwner */
860 : VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
861 : }
862 3281210 : }
863 :
864 : /*
865 : * GUC check_hook for temp_buffers
866 : */
867 : bool
868 2186 : check_temp_buffers(int *newval, void **extra, GucSource source)
869 : {
870 : /*
871 : * Once local buffers have been initialized, it's too late to change this.
872 : * However, if this is only a test call, allow it.
873 : */
874 2186 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
875 : {
876 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
877 0 : return false;
878 : }
879 2186 : return true;
880 : }
881 :
882 : /*
883 : * GetLocalBufferStorage - allocate memory for a local buffer
884 : *
885 : * The idea of this function is to aggregate our requests for storage
886 : * so that the memory manager doesn't see a whole lot of relatively small
887 : * requests. Since we'll never give back a local buffer once it's created
888 : * within a particular process, no point in burdening memmgr with separately
889 : * managed chunks.
890 : */
891 : static Block
892 31206 : GetLocalBufferStorage(void)
893 : {
894 : static char *cur_block = NULL;
895 : static int next_buf_in_block = 0;
896 : static int num_bufs_in_block = 0;
897 : static int total_bufs_allocated = 0;
898 : static MemoryContext LocalBufferContext = NULL;
899 :
900 : char *this_buf;
901 :
902 : Assert(total_bufs_allocated < NLocBuffer);
903 :
904 31206 : if (next_buf_in_block >= num_bufs_in_block)
905 : {
906 : /* Need to make a new request to memmgr */
907 : int num_bufs;
908 :
909 : /*
910 : * We allocate local buffers in a context of their own, so that the
911 : * space eaten for them is easily recognizable in MemoryContextStats
912 : * output. Create the context on first use.
913 : */
914 850 : if (LocalBufferContext == NULL)
915 528 : LocalBufferContext =
916 528 : AllocSetContextCreate(TopMemoryContext,
917 : "LocalBufferContext",
918 : ALLOCSET_DEFAULT_SIZES);
919 :
920 : /* Start with a 16-buffer request; subsequent ones double each time */
921 850 : num_bufs = Max(num_bufs_in_block * 2, 16);
922 : /* But not more than what we need for all remaining local bufs */
923 850 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
924 : /* And don't overflow MaxAllocSize, either */
925 850 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
926 :
927 : /* Buffers should be I/O aligned. */
928 850 : cur_block = (char *)
929 850 : TYPEALIGN(PG_IO_ALIGN_SIZE,
930 : MemoryContextAlloc(LocalBufferContext,
931 : num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
932 850 : next_buf_in_block = 0;
933 850 : num_bufs_in_block = num_bufs;
934 : }
935 :
936 : /* Allocate next buffer in current memory block */
937 31206 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
938 31206 : next_buf_in_block++;
939 31206 : total_bufs_allocated++;
940 :
941 : /*
942 : * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
943 : * here. The block is actually undefined, but we want consistency with
944 : * the regular case of not needing to allocate memory. This is
945 : * specifically needed when method_io_uring.c fills the block, because
946 : * Valgrind doesn't recognize io_uring reads causing undefined memory to
947 : * become defined.
948 : */
949 : VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
950 :
951 31206 : return (Block) this_buf;
952 : }
953 :
954 : /*
955 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
956 : *
957 : * This is just like CheckForBufferLeaks(), but for local buffers.
958 : */
959 : static void
960 905394 : CheckForLocalBufferLeaks(void)
961 : {
962 : #ifdef USE_ASSERT_CHECKING
963 : if (LocalRefCount)
964 : {
965 : int RefCountErrors = 0;
966 : int i;
967 :
968 : for (i = 0; i < NLocBuffer; i++)
969 : {
970 : if (LocalRefCount[i] != 0)
971 : {
972 : Buffer b = -i - 1;
973 : char *s;
974 :
975 : s = DebugPrintBufferRefcount(b);
976 : elog(WARNING, "local buffer refcount leak: %s", s);
977 : pfree(s);
978 :
979 : RefCountErrors++;
980 : }
981 : }
982 : Assert(RefCountErrors == 0);
983 : }
984 : #endif
985 905394 : }
986 :
987 : /*
988 : * AtEOXact_LocalBuffers - clean up at end of transaction.
989 : *
990 : * This is just like AtEOXact_Buffers, but for local buffers.
991 : */
992 : void
993 864572 : AtEOXact_LocalBuffers(bool isCommit)
994 : {
995 864572 : CheckForLocalBufferLeaks();
996 864572 : }
997 :
998 : /*
999 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
1000 : *
1001 : * This is just like AtProcExit_Buffers, but for local buffers.
1002 : */
1003 : void
1004 40822 : AtProcExit_LocalBuffers(void)
1005 : {
1006 : /*
1007 : * We shouldn't be holding any remaining pins; if we are, and assertions
1008 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
1009 : * drop the temp rels.
1010 : */
1011 40822 : CheckForLocalBufferLeaks();
1012 40822 : }
|