Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "executor/instrument.h"
20 : #include "pgstat.h"
21 : #include "storage/aio.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "storage/fd.h"
25 : #include "utils/guc_hooks.h"
26 : #include "utils/memutils.h"
27 : #include "utils/resowner.h"
28 :
29 :
30 : /*#define LBDEBUG*/
31 :
32 : /* entry for buffer lookup hashtable */
33 : typedef struct
34 : {
35 : BufferTag key; /* Tag of a disk page */
36 : int id; /* Associated local buffer's index */
37 : } LocalBufferLookupEnt;
38 :
39 : /* Note: this macro only works on local buffers, not shared ones! */
40 : #define LocalBufHdrGetBlock(bufHdr) \
41 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
42 :
43 : int NLocBuffer = 0; /* until buffers are initialized */
44 :
45 : BufferDesc *LocalBufferDescriptors = NULL;
46 : Block *LocalBufferBlockPointers = NULL;
47 : int32 *LocalRefCount = NULL;
48 :
49 : static int nextFreeLocalBufId = 0;
50 :
51 : static HTAB *LocalBufHash = NULL;
52 :
53 : /* number of local buffers pinned at least once */
54 : static int NLocalPinnedBuffers = 0;
55 :
56 :
57 : static void InitLocalBuffers(void);
58 : static Block GetLocalBufferStorage(void);
59 : static Buffer GetLocalVictimBuffer(void);
60 : static void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
61 :
62 :
63 : /*
64 : * PrefetchLocalBuffer -
65 : * initiate asynchronous read of a block of a relation
66 : *
67 : * Do PrefetchBuffer's work for temporary relations.
68 : * No-op if prefetching isn't compiled in.
69 : */
70 : PrefetchBufferResult
71 1502 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
72 : BlockNumber blockNum)
73 : {
74 1502 : PrefetchBufferResult result = {InvalidBuffer, false};
75 : BufferTag newTag; /* identity of requested block */
76 : LocalBufferLookupEnt *hresult;
77 :
78 1502 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
79 :
80 : /* Initialize local buffers if first request in this session */
81 1502 : if (LocalBufHash == NULL)
82 0 : InitLocalBuffers();
83 :
84 : /* See if the desired buffer already exists */
85 : hresult = (LocalBufferLookupEnt *)
86 1502 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
87 :
88 1502 : if (hresult)
89 : {
90 : /* Yes, so nothing to do */
91 1502 : result.recent_buffer = -hresult->id - 1;
92 : }
93 : else
94 : {
95 : #ifdef USE_PREFETCH
96 : /* Not in buffers, so initiate prefetch */
97 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
98 0 : smgrprefetch(smgr, forkNum, blockNum, 1))
99 : {
100 0 : result.initiated_io = true;
101 : }
102 : #endif /* USE_PREFETCH */
103 : }
104 :
105 1502 : return result;
106 : }
107 :
108 :
109 : /*
110 : * LocalBufferAlloc -
111 : * Find or create a local buffer for the given page of the given relation.
112 : *
113 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
114 : * any locking since this is all local. We support only default access
115 : * strategy (hence, usage_count is always advanced).
116 : */
117 : BufferDesc *
118 2319138 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
119 : bool *foundPtr)
120 : {
121 : BufferTag newTag; /* identity of requested block */
122 : LocalBufferLookupEnt *hresult;
123 : BufferDesc *bufHdr;
124 : Buffer victim_buffer;
125 : int bufid;
126 : bool found;
127 :
128 2319138 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
129 :
130 : /* Initialize local buffers if first request in this session */
131 2319138 : if (LocalBufHash == NULL)
132 26 : InitLocalBuffers();
133 :
134 2319138 : ResourceOwnerEnlarge(CurrentResourceOwner);
135 :
136 : /* See if the desired buffer already exists */
137 : hresult = (LocalBufferLookupEnt *)
138 2319138 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
139 :
140 2319138 : if (hresult)
141 : {
142 2302512 : bufid = hresult->id;
143 2302512 : bufHdr = GetLocalBufferDescriptor(bufid);
144 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
145 :
146 2302512 : *foundPtr = PinLocalBuffer(bufHdr, true);
147 : }
148 : else
149 : {
150 : uint32 buf_state;
151 :
152 16626 : victim_buffer = GetLocalVictimBuffer();
153 16614 : bufid = -victim_buffer - 1;
154 16614 : bufHdr = GetLocalBufferDescriptor(bufid);
155 :
156 : hresult = (LocalBufferLookupEnt *)
157 16614 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
158 16614 : if (found) /* shouldn't happen */
159 0 : elog(ERROR, "local buffer hash table corrupted");
160 16614 : hresult->id = bufid;
161 :
162 : /*
163 : * it's all ours now.
164 : */
165 16614 : bufHdr->tag = newTag;
166 :
167 16614 : buf_state = pg_atomic_read_u32(&bufHdr->state);
168 16614 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
169 16614 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
170 16614 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
171 :
172 16614 : *foundPtr = false;
173 : }
174 :
175 2319126 : return bufHdr;
176 : }
177 :
178 : /*
179 : * Like FlushBuffer(), just for local buffers.
180 : */
181 : void
182 5640 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
183 : {
184 : instr_time io_start;
185 5640 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
186 :
187 : /*
188 : * Try to start an I/O operation. There currently are no reasons for
189 : * StartLocalBufferIO to return false, so we raise an error in that case.
190 : */
191 5640 : if (!StartLocalBufferIO(bufHdr, false, false))
192 0 : elog(ERROR, "failed to start write IO on local buffer");
193 :
194 : /* Find smgr relation for buffer */
195 5640 : if (reln == NULL)
196 5040 : reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
197 : MyProcNumber);
198 :
199 5640 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
200 :
201 5640 : io_start = pgstat_prepare_io_time(track_io_timing);
202 :
203 : /* And write... */
204 5640 : smgrwrite(reln,
205 5640 : BufTagGetForkNum(&bufHdr->tag),
206 : bufHdr->tag.blockNum,
207 : localpage,
208 : false);
209 :
210 : /* Temporary table I/O does not use Buffer Access Strategies */
211 5640 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
212 : IOOP_WRITE, io_start, 1, BLCKSZ);
213 :
214 : /* Mark not-dirty */
215 5640 : TerminateLocalBufferIO(bufHdr, true, 0, false);
216 :
217 5640 : pgBufferUsage.local_blks_written++;
218 5640 : }
219 :
220 : static Buffer
221 42842 : GetLocalVictimBuffer(void)
222 : {
223 : int victim_bufid;
224 : int trycounter;
225 : BufferDesc *bufHdr;
226 :
227 42842 : ResourceOwnerEnlarge(CurrentResourceOwner);
228 :
229 : /*
230 : * Need to get a new buffer. We use a clock sweep algorithm (essentially
231 : * the same as what freelist.c does now...)
232 : */
233 42842 : trycounter = NLocBuffer;
234 : for (;;)
235 : {
236 190868 : victim_bufid = nextFreeLocalBufId;
237 :
238 190868 : if (++nextFreeLocalBufId >= NLocBuffer)
239 1590 : nextFreeLocalBufId = 0;
240 :
241 190868 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
242 :
243 190868 : if (LocalRefCount[victim_bufid] == 0)
244 : {
245 69536 : uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
246 :
247 69536 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
248 : {
249 26706 : buf_state -= BUF_USAGECOUNT_ONE;
250 26706 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
251 26706 : trycounter = NLocBuffer;
252 : }
253 42830 : else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
254 : {
255 : /*
256 : * This can be reached if the backend initiated AIO for this
257 : * buffer and then errored out.
258 : */
259 : }
260 : else
261 : {
262 : /* Found a usable buffer */
263 42830 : PinLocalBuffer(bufHdr, false);
264 42830 : break;
265 : }
266 : }
267 121332 : else if (--trycounter == 0)
268 12 : ereport(ERROR,
269 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
270 : errmsg("no empty local buffer available")));
271 : }
272 :
273 : /*
274 : * lazy memory allocation: allocate space on first use of a buffer.
275 : */
276 42830 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
277 : {
278 : /* Set pointer for use by BufferGetBlock() macro */
279 30014 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
280 : }
281 :
282 : /*
283 : * this buffer is not referenced but it might still be dirty. if that's
284 : * the case, write it out before reusing it!
285 : */
286 42830 : if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
287 5040 : FlushLocalBuffer(bufHdr, NULL);
288 :
289 : /*
290 : * Remove the victim buffer from the hashtable and mark as invalid.
291 : */
292 42830 : if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
293 : {
294 11226 : InvalidateLocalBuffer(bufHdr, false);
295 :
296 11226 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
297 : }
298 :
299 42830 : return BufferDescriptorGetBuffer(bufHdr);
300 : }
301 :
302 : /* see GetPinLimit() */
303 : uint32
304 13368 : GetLocalPinLimit(void)
305 : {
306 : /* Every backend has its own temporary buffers, and can pin them all. */
307 13368 : return num_temp_buffers;
308 : }
309 :
310 : /* see GetAdditionalPinLimit() */
311 : uint32
312 47100 : GetAdditionalLocalPinLimit(void)
313 : {
314 : Assert(NLocalPinnedBuffers <= num_temp_buffers);
315 47100 : return num_temp_buffers - NLocalPinnedBuffers;
316 : }
317 :
318 : /* see LimitAdditionalPins() */
319 : void
320 19700 : LimitAdditionalLocalPins(uint32 *additional_pins)
321 : {
322 : uint32 max_pins;
323 :
324 19700 : if (*additional_pins <= 1)
325 19078 : return;
326 :
327 : /*
328 : * In contrast to LimitAdditionalPins() other backends don't play a role
329 : * here. We can allow up to NLocBuffer pins in total, but it might not be
330 : * initialized yet so read num_temp_buffers.
331 : */
332 622 : max_pins = (num_temp_buffers - NLocalPinnedBuffers);
333 :
334 622 : if (*additional_pins >= max_pins)
335 0 : *additional_pins = max_pins;
336 : }
337 :
338 : /*
339 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
340 : * temporary buffers.
341 : */
342 : BlockNumber
343 19700 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
344 : ForkNumber fork,
345 : uint32 flags,
346 : uint32 extend_by,
347 : BlockNumber extend_upto,
348 : Buffer *buffers,
349 : uint32 *extended_by)
350 : {
351 : BlockNumber first_block;
352 : instr_time io_start;
353 :
354 : /* Initialize local buffers if first request in this session */
355 19700 : if (LocalBufHash == NULL)
356 482 : InitLocalBuffers();
357 :
358 19700 : LimitAdditionalLocalPins(&extend_by);
359 :
360 45916 : for (uint32 i = 0; i < extend_by; i++)
361 : {
362 : BufferDesc *buf_hdr;
363 : Block buf_block;
364 :
365 26216 : buffers[i] = GetLocalVictimBuffer();
366 26216 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
367 26216 : buf_block = LocalBufHdrGetBlock(buf_hdr);
368 :
369 : /* new buffers are zero-filled */
370 26216 : MemSet(buf_block, 0, BLCKSZ);
371 : }
372 :
373 19700 : first_block = smgrnblocks(bmr.smgr, fork);
374 :
375 : if (extend_upto != InvalidBlockNumber)
376 : {
377 : /*
378 : * In contrast to shared relations, nothing could change the relation
379 : * size concurrently. Thus we shouldn't end up finding that we don't
380 : * need to do anything.
381 : */
382 : Assert(first_block <= extend_upto);
383 :
384 : Assert((uint64) first_block + extend_by <= extend_upto);
385 : }
386 :
387 : /* Fail if relation is already at maximum possible length */
388 19700 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
389 0 : ereport(ERROR,
390 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
391 : errmsg("cannot extend relation %s beyond %u blocks",
392 : relpath(bmr.smgr->smgr_rlocator, fork).str,
393 : MaxBlockNumber)));
394 :
395 45916 : for (uint32 i = 0; i < extend_by; i++)
396 : {
397 : int victim_buf_id;
398 : BufferDesc *victim_buf_hdr;
399 : BufferTag tag;
400 : LocalBufferLookupEnt *hresult;
401 : bool found;
402 :
403 26216 : victim_buf_id = -buffers[i] - 1;
404 26216 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
405 :
406 : /* in case we need to pin an existing buffer below */
407 26216 : ResourceOwnerEnlarge(CurrentResourceOwner);
408 :
409 26216 : InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
410 :
411 : hresult = (LocalBufferLookupEnt *)
412 26216 : hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
413 26216 : if (found)
414 : {
415 : BufferDesc *existing_hdr;
416 : uint32 buf_state;
417 :
418 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
419 :
420 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
421 0 : PinLocalBuffer(existing_hdr, false);
422 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
423 :
424 : /*
425 : * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
426 : */
427 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
428 : Assert(buf_state & BM_TAG_VALID);
429 : Assert(!(buf_state & BM_DIRTY));
430 0 : buf_state &= ~BM_VALID;
431 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
432 :
433 : /* no need to loop for local buffers */
434 0 : StartLocalBufferIO(existing_hdr, true, false);
435 : }
436 : else
437 : {
438 26216 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
439 :
440 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
441 :
442 26216 : victim_buf_hdr->tag = tag;
443 :
444 26216 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
445 :
446 26216 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
447 :
448 26216 : hresult->id = victim_buf_id;
449 :
450 26216 : StartLocalBufferIO(victim_buf_hdr, true, false);
451 : }
452 : }
453 :
454 19700 : io_start = pgstat_prepare_io_time(track_io_timing);
455 :
456 : /* actually extend relation */
457 19700 : smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
458 :
459 19700 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
460 19700 : io_start, 1, extend_by * BLCKSZ);
461 :
462 45916 : for (uint32 i = 0; i < extend_by; i++)
463 : {
464 26216 : Buffer buf = buffers[i];
465 : BufferDesc *buf_hdr;
466 : uint32 buf_state;
467 :
468 26216 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
469 :
470 26216 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
471 26216 : buf_state |= BM_VALID;
472 26216 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
473 : }
474 :
475 19700 : *extended_by = extend_by;
476 :
477 19700 : pgBufferUsage.local_blks_written += extend_by;
478 :
479 19700 : return first_block;
480 : }
481 :
482 : /*
483 : * MarkLocalBufferDirty -
484 : * mark a local buffer dirty
485 : */
486 : void
487 3439154 : MarkLocalBufferDirty(Buffer buffer)
488 : {
489 : int bufid;
490 : BufferDesc *bufHdr;
491 : uint32 buf_state;
492 :
493 : Assert(BufferIsLocal(buffer));
494 :
495 : #ifdef LBDEBUG
496 : fprintf(stderr, "LB DIRTY %d\n", buffer);
497 : #endif
498 :
499 3439154 : bufid = -buffer - 1;
500 :
501 : Assert(LocalRefCount[bufid] > 0);
502 :
503 3439154 : bufHdr = GetLocalBufferDescriptor(bufid);
504 :
505 3439154 : buf_state = pg_atomic_read_u32(&bufHdr->state);
506 :
507 3439154 : if (!(buf_state & BM_DIRTY))
508 26584 : pgBufferUsage.local_blks_dirtied++;
509 :
510 3439154 : buf_state |= BM_DIRTY;
511 :
512 3439154 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
513 3439154 : }
514 :
515 : /*
516 : * Like StartBufferIO, but for local buffers
517 : */
518 : bool
519 48470 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
520 : {
521 : uint32 buf_state;
522 :
523 : /*
524 : * With AIO the buffer could have IO in progress, e.g. when there are two
525 : * scans of the same relation. Either wait for the other IO or return
526 : * false.
527 : */
528 48470 : if (pgaio_wref_valid(&bufHdr->io_wref))
529 : {
530 0 : PgAioWaitRef iow = bufHdr->io_wref;
531 :
532 0 : if (nowait)
533 0 : return false;
534 :
535 0 : pgaio_wref_wait(&iow);
536 : }
537 :
538 : /* Once we get here, there is definitely no I/O active on this buffer */
539 :
540 : /* Check if someone else already did the I/O */
541 48470 : buf_state = pg_atomic_read_u32(&bufHdr->state);
542 48470 : if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
543 : {
544 0 : return false;
545 : }
546 :
547 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
548 :
549 : /* local buffers don't track IO using resowners */
550 :
551 48470 : return true;
552 : }
553 :
554 : /*
555 : * Like TerminateBufferIO, but for local buffers
556 : */
557 : void
558 22254 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
559 : bool release_aio)
560 : {
561 : /* Only need to adjust flags */
562 22254 : uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
563 :
564 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
565 :
566 : /* Clear earlier errors, if this IO failed, it'll be marked again */
567 22254 : buf_state &= ~BM_IO_ERROR;
568 :
569 22254 : if (clear_dirty)
570 5640 : buf_state &= ~BM_DIRTY;
571 :
572 22254 : if (release_aio)
573 : {
574 : /* release pin held by IO subsystem, see also buffer_stage_common() */
575 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
576 16614 : buf_state -= BUF_REFCOUNT_ONE;
577 16614 : pgaio_wref_clear(&bufHdr->io_wref);
578 : }
579 :
580 22254 : buf_state |= set_flag_bits;
581 22254 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
582 :
583 : /* local buffers don't track IO using resowners */
584 :
585 : /* local buffers don't use the IO CV, as no other process can see buffer */
586 :
587 : /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
588 22254 : }
589 :
590 : /*
591 : * InvalidateLocalBuffer -- mark a local buffer invalid.
592 : *
593 : * If check_unreferenced is true, error out if the buffer is still
594 : * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
595 : * as part of changing the identity of a buffer, instead of just dropping the
596 : * buffer.
597 : *
598 : * See also InvalidateBuffer().
599 : */
600 : static void
601 42830 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
602 : {
603 42830 : Buffer buffer = BufferDescriptorGetBuffer(bufHdr);
604 42830 : int bufid = -buffer - 1;
605 : uint32 buf_state;
606 : LocalBufferLookupEnt *hresult;
607 :
608 : /*
609 : * It's possible that we started IO on this buffer before e.g. aborting
610 : * the transaction that created a table. We need to wait for that IO to
611 : * complete before removing / reusing the buffer.
612 : */
613 42830 : if (pgaio_wref_valid(&bufHdr->io_wref))
614 : {
615 0 : PgAioWaitRef iow = bufHdr->io_wref;
616 :
617 0 : pgaio_wref_wait(&iow);
618 : Assert(!pgaio_wref_valid(&bufHdr->io_wref));
619 : }
620 :
621 42830 : buf_state = pg_atomic_read_u32(&bufHdr->state);
622 :
623 : /*
624 : * We need to test not just LocalRefCount[bufid] but also the BufferDesc
625 : * itself, as the latter is used to represent a pin by the AIO subsystem.
626 : * This can happen if AIO is initiated and then the query errors out.
627 : */
628 42830 : if (check_unreferenced &&
629 31604 : (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
630 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
631 : bufHdr->tag.blockNum,
632 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
633 : MyProcNumber,
634 : BufTagGetForkNum(&bufHdr->tag)).str,
635 : LocalRefCount[bufid]);
636 :
637 : /* Remove entry from hashtable */
638 : hresult = (LocalBufferLookupEnt *)
639 42830 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
640 42830 : if (!hresult) /* shouldn't happen */
641 0 : elog(ERROR, "local buffer hash table corrupted");
642 : /* Mark buffer invalid */
643 42830 : ClearBufferTag(&bufHdr->tag);
644 42830 : buf_state &= ~BUF_FLAG_MASK;
645 42830 : buf_state &= ~BUF_USAGECOUNT_MASK;
646 42830 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
647 42830 : }
648 :
649 : /*
650 : * DropRelationLocalBuffers
651 : * This function removes from the buffer pool all the pages of the
652 : * specified relation that have block numbers >= firstDelBlock.
653 : * (In particular, with firstDelBlock = 0, all pages are removed.)
654 : * Dirty pages are simply dropped, without bothering to write them
655 : * out first. Therefore, this is NOT rollback-able, and so should be
656 : * used only with extreme caution!
657 : *
658 : * See DropRelationBuffers in bufmgr.c for more notes.
659 : */
660 : void
661 710 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
662 : BlockNumber firstDelBlock)
663 : {
664 : int i;
665 :
666 664262 : for (i = 0; i < NLocBuffer; i++)
667 : {
668 663552 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
669 : uint32 buf_state;
670 :
671 663552 : buf_state = pg_atomic_read_u32(&bufHdr->state);
672 :
673 723942 : if ((buf_state & BM_TAG_VALID) &&
674 62232 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
675 1842 : BufTagGetForkNum(&bufHdr->tag) == forkNum &&
676 1666 : bufHdr->tag.blockNum >= firstDelBlock)
677 : {
678 1602 : InvalidateLocalBuffer(bufHdr, true);
679 : }
680 : }
681 710 : }
682 :
683 : /*
684 : * DropRelationAllLocalBuffers
685 : * This function removes from the buffer pool all pages of all forks
686 : * of the specified relation.
687 : *
688 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
689 : */
690 : void
691 6128 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
692 : {
693 : int i;
694 :
695 5888192 : for (i = 0; i < NLocBuffer; i++)
696 : {
697 5882064 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
698 : uint32 buf_state;
699 :
700 5882064 : buf_state = pg_atomic_read_u32(&bufHdr->state);
701 :
702 6304864 : if ((buf_state & BM_TAG_VALID) &&
703 422800 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
704 : {
705 30002 : InvalidateLocalBuffer(bufHdr, true);
706 : }
707 : }
708 6128 : }
709 :
710 : /*
711 : * InitLocalBuffers -
712 : * init the local buffer cache. Since most queries (esp. multi-user ones)
713 : * don't involve local buffers, we delay allocating actual memory for the
714 : * buffers until we need them; just make the buffer headers here.
715 : */
716 : static void
717 508 : InitLocalBuffers(void)
718 : {
719 508 : int nbufs = num_temp_buffers;
720 : HASHCTL info;
721 : int i;
722 :
723 : /*
724 : * Parallel workers can't access data in temporary tables, because they
725 : * have no visibility into the local buffers of their leader. This is a
726 : * convenient, low-cost place to provide a backstop check for that. Note
727 : * that we don't wish to prevent a parallel worker from accessing catalog
728 : * metadata about a temp table, so checks at higher levels would be
729 : * inappropriate.
730 : */
731 508 : if (IsParallelWorker())
732 0 : ereport(ERROR,
733 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
734 : errmsg("cannot access temporary tables during a parallel operation")));
735 :
736 : /* Allocate and zero buffer headers and auxiliary arrays */
737 508 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
738 508 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
739 508 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
740 508 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
741 0 : ereport(FATAL,
742 : (errcode(ERRCODE_OUT_OF_MEMORY),
743 : errmsg("out of memory")));
744 :
745 508 : nextFreeLocalBufId = 0;
746 :
747 : /* initialize fields that need to start off nonzero */
748 509612 : for (i = 0; i < nbufs; i++)
749 : {
750 509104 : BufferDesc *buf = GetLocalBufferDescriptor(i);
751 :
752 : /*
753 : * negative to indicate local buffer. This is tricky: shared buffers
754 : * start with 0. We have to start with -2. (Note that the routine
755 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
756 : * is -1.)
757 : */
758 509104 : buf->buf_id = -i - 2;
759 :
760 509104 : pgaio_wref_clear(&buf->io_wref);
761 :
762 : /*
763 : * Intentionally do not initialize the buffer's atomic variable
764 : * (besides zeroing the underlying memory above). That way we get
765 : * errors on platforms without atomics, if somebody (re-)introduces
766 : * atomic operations for local buffers.
767 : */
768 : }
769 :
770 : /* Create the lookup hash table */
771 508 : info.keysize = sizeof(BufferTag);
772 508 : info.entrysize = sizeof(LocalBufferLookupEnt);
773 :
774 508 : LocalBufHash = hash_create("Local Buffer Lookup Table",
775 : nbufs,
776 : &info,
777 : HASH_ELEM | HASH_BLOBS);
778 :
779 508 : if (!LocalBufHash)
780 0 : elog(ERROR, "could not initialize local buffer hash table");
781 :
782 : /* Initialization done, mark buffers allocated */
783 508 : NLocBuffer = nbufs;
784 508 : }
785 :
786 : /*
787 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
788 : * that does not support adjusting the usagecount - but so far it does not
789 : * seem worth the trouble.
790 : *
791 : * Note that ResourceOwnerEnlarge() must have been done already.
792 : */
793 : bool
794 2345342 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
795 : {
796 : uint32 buf_state;
797 2345342 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
798 2345342 : int bufid = -buffer - 1;
799 :
800 2345342 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
801 :
802 2345342 : if (LocalRefCount[bufid] == 0)
803 : {
804 2173396 : NLocalPinnedBuffers++;
805 2173396 : buf_state += BUF_REFCOUNT_ONE;
806 2173396 : if (adjust_usagecount &&
807 2130566 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
808 : {
809 116390 : buf_state += BUF_USAGECOUNT_ONE;
810 : }
811 2173396 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
812 : }
813 2345342 : LocalRefCount[bufid]++;
814 2345342 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
815 : BufferDescriptorGetBuffer(buf_hdr));
816 :
817 2345342 : return buf_state & BM_VALID;
818 : }
819 :
820 : void
821 3048254 : UnpinLocalBuffer(Buffer buffer)
822 : {
823 3048254 : UnpinLocalBufferNoOwner(buffer);
824 3048254 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
825 3048254 : }
826 :
827 : void
828 3054204 : UnpinLocalBufferNoOwner(Buffer buffer)
829 : {
830 3054204 : int buffid = -buffer - 1;
831 :
832 : Assert(BufferIsLocal(buffer));
833 : Assert(LocalRefCount[buffid] > 0);
834 : Assert(NLocalPinnedBuffers > 0);
835 :
836 3054204 : if (--LocalRefCount[buffid] == 0)
837 : {
838 2173396 : BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
839 : uint32 buf_state;
840 :
841 2173396 : NLocalPinnedBuffers--;
842 :
843 2173396 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
844 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
845 2173396 : buf_state -= BUF_REFCOUNT_ONE;
846 2173396 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
847 : }
848 3054204 : }
849 :
850 : /*
851 : * GUC check_hook for temp_buffers
852 : */
853 : bool
854 2112 : check_temp_buffers(int *newval, void **extra, GucSource source)
855 : {
856 : /*
857 : * Once local buffers have been initialized, it's too late to change this.
858 : * However, if this is only a test call, allow it.
859 : */
860 2112 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
861 : {
862 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
863 0 : return false;
864 : }
865 2112 : return true;
866 : }
867 :
868 : /*
869 : * GetLocalBufferStorage - allocate memory for a local buffer
870 : *
871 : * The idea of this function is to aggregate our requests for storage
872 : * so that the memory manager doesn't see a whole lot of relatively small
873 : * requests. Since we'll never give back a local buffer once it's created
874 : * within a particular process, no point in burdening memmgr with separately
875 : * managed chunks.
876 : */
877 : static Block
878 30014 : GetLocalBufferStorage(void)
879 : {
880 : static char *cur_block = NULL;
881 : static int next_buf_in_block = 0;
882 : static int num_bufs_in_block = 0;
883 : static int total_bufs_allocated = 0;
884 : static MemoryContext LocalBufferContext = NULL;
885 :
886 : char *this_buf;
887 :
888 : Assert(total_bufs_allocated < NLocBuffer);
889 :
890 30014 : if (next_buf_in_block >= num_bufs_in_block)
891 : {
892 : /* Need to make a new request to memmgr */
893 : int num_bufs;
894 :
895 : /*
896 : * We allocate local buffers in a context of their own, so that the
897 : * space eaten for them is easily recognizable in MemoryContextStats
898 : * output. Create the context on first use.
899 : */
900 798 : if (LocalBufferContext == NULL)
901 508 : LocalBufferContext =
902 508 : AllocSetContextCreate(TopMemoryContext,
903 : "LocalBufferContext",
904 : ALLOCSET_DEFAULT_SIZES);
905 :
906 : /* Start with a 16-buffer request; subsequent ones double each time */
907 798 : num_bufs = Max(num_bufs_in_block * 2, 16);
908 : /* But not more than what we need for all remaining local bufs */
909 798 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
910 : /* And don't overflow MaxAllocSize, either */
911 798 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
912 :
913 : /* Buffers should be I/O aligned. */
914 798 : cur_block = (char *)
915 798 : TYPEALIGN(PG_IO_ALIGN_SIZE,
916 : MemoryContextAlloc(LocalBufferContext,
917 : num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
918 798 : next_buf_in_block = 0;
919 798 : num_bufs_in_block = num_bufs;
920 : }
921 :
922 : /* Allocate next buffer in current memory block */
923 30014 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
924 30014 : next_buf_in_block++;
925 30014 : total_bufs_allocated++;
926 :
927 30014 : return (Block) this_buf;
928 : }
929 :
930 : /*
931 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
932 : *
933 : * This is just like CheckForBufferLeaks(), but for local buffers.
934 : */
935 : static void
936 861480 : CheckForLocalBufferLeaks(void)
937 : {
938 : #ifdef USE_ASSERT_CHECKING
939 : if (LocalRefCount)
940 : {
941 : int RefCountErrors = 0;
942 : int i;
943 :
944 : for (i = 0; i < NLocBuffer; i++)
945 : {
946 : if (LocalRefCount[i] != 0)
947 : {
948 : Buffer b = -i - 1;
949 : char *s;
950 :
951 : s = DebugPrintBufferRefcount(b);
952 : elog(WARNING, "local buffer refcount leak: %s", s);
953 : pfree(s);
954 :
955 : RefCountErrors++;
956 : }
957 : }
958 : Assert(RefCountErrors == 0);
959 : }
960 : #endif
961 861480 : }
962 :
963 : /*
964 : * AtEOXact_LocalBuffers - clean up at end of transaction.
965 : *
966 : * This is just like AtEOXact_Buffers, but for local buffers.
967 : */
968 : void
969 819202 : AtEOXact_LocalBuffers(bool isCommit)
970 : {
971 819202 : CheckForLocalBufferLeaks();
972 819202 : }
973 :
974 : /*
975 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
976 : *
977 : * This is just like AtProcExit_Buffers, but for local buffers.
978 : */
979 : void
980 42278 : AtProcExit_LocalBuffers(void)
981 : {
982 : /*
983 : * We shouldn't be holding any remaining pins; if we are, and assertions
984 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
985 : * drop the temp rels.
986 : */
987 42278 : CheckForLocalBufferLeaks();
988 42278 : }
|