Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "executor/instrument.h"
20 : #include "pgstat.h"
21 : #include "storage/buf_internals.h"
22 : #include "storage/bufmgr.h"
23 : #include "storage/fd.h"
24 : #include "utils/guc_hooks.h"
25 : #include "utils/memutils.h"
26 : #include "utils/resowner.h"
27 :
28 :
29 : /*#define LBDEBUG*/
30 :
31 : /* entry for buffer lookup hashtable */
32 : typedef struct
33 : {
34 : BufferTag key; /* Tag of a disk page */
35 : int id; /* Associated local buffer's index */
36 : } LocalBufferLookupEnt;
37 :
38 : /* Note: this macro only works on local buffers, not shared ones! */
39 : #define LocalBufHdrGetBlock(bufHdr) \
40 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
41 :
42 : int NLocBuffer = 0; /* until buffers are initialized */
43 :
44 : BufferDesc *LocalBufferDescriptors = NULL;
45 : Block *LocalBufferBlockPointers = NULL;
46 : int32 *LocalRefCount = NULL;
47 :
48 : static int nextFreeLocalBufId = 0;
49 :
50 : static HTAB *LocalBufHash = NULL;
51 :
52 : /* number of local buffers pinned at least once */
53 : static int NLocalPinnedBuffers = 0;
54 :
55 :
56 : static void InitLocalBuffers(void);
57 : static Block GetLocalBufferStorage(void);
58 : static Buffer GetLocalVictimBuffer(void);
59 :
60 :
61 : /*
62 : * PrefetchLocalBuffer -
63 : * initiate asynchronous read of a block of a relation
64 : *
65 : * Do PrefetchBuffer's work for temporary relations.
66 : * No-op if prefetching isn't compiled in.
67 : */
68 : PrefetchBufferResult
69 6224 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
70 : BlockNumber blockNum)
71 : {
72 6224 : PrefetchBufferResult result = {InvalidBuffer, false};
73 : BufferTag newTag; /* identity of requested block */
74 : LocalBufferLookupEnt *hresult;
75 :
76 6224 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
77 :
78 : /* Initialize local buffers if first request in this session */
79 6224 : if (LocalBufHash == NULL)
80 0 : InitLocalBuffers();
81 :
82 : /* See if the desired buffer already exists */
83 : hresult = (LocalBufferLookupEnt *)
84 6224 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
85 :
86 6224 : if (hresult)
87 : {
88 : /* Yes, so nothing to do */
89 6224 : result.recent_buffer = -hresult->id - 1;
90 : }
91 : else
92 : {
93 : #ifdef USE_PREFETCH
94 : /* Not in buffers, so initiate prefetch */
95 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
96 0 : smgrprefetch(smgr, forkNum, blockNum, 1))
97 : {
98 0 : result.initiated_io = true;
99 : }
100 : #endif /* USE_PREFETCH */
101 : }
102 :
103 6224 : return result;
104 : }
105 :
106 :
107 : /*
108 : * LocalBufferAlloc -
109 : * Find or create a local buffer for the given page of the given relation.
110 : *
111 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
112 : * any locking since this is all local. We support only default access
113 : * strategy (hence, usage_count is always advanced).
114 : */
115 : BufferDesc *
116 2124928 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
117 : bool *foundPtr)
118 : {
119 : BufferTag newTag; /* identity of requested block */
120 : LocalBufferLookupEnt *hresult;
121 : BufferDesc *bufHdr;
122 : Buffer victim_buffer;
123 : int bufid;
124 : bool found;
125 :
126 2124928 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
127 :
128 : /* Initialize local buffers if first request in this session */
129 2124928 : if (LocalBufHash == NULL)
130 26 : InitLocalBuffers();
131 :
132 2124928 : ResourceOwnerEnlarge(CurrentResourceOwner);
133 :
134 : /* See if the desired buffer already exists */
135 : hresult = (LocalBufferLookupEnt *)
136 2124928 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
137 :
138 2124928 : if (hresult)
139 : {
140 2117230 : bufid = hresult->id;
141 2117230 : bufHdr = GetLocalBufferDescriptor(bufid);
142 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
143 :
144 2117230 : *foundPtr = PinLocalBuffer(bufHdr, true);
145 : }
146 : else
147 : {
148 : uint32 buf_state;
149 :
150 7698 : victim_buffer = GetLocalVictimBuffer();
151 7698 : bufid = -victim_buffer - 1;
152 7698 : bufHdr = GetLocalBufferDescriptor(bufid);
153 :
154 : hresult = (LocalBufferLookupEnt *)
155 7698 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
156 7698 : if (found) /* shouldn't happen */
157 0 : elog(ERROR, "local buffer hash table corrupted");
158 7698 : hresult->id = bufid;
159 :
160 : /*
161 : * it's all ours now.
162 : */
163 7698 : bufHdr->tag = newTag;
164 :
165 7698 : buf_state = pg_atomic_read_u32(&bufHdr->state);
166 7698 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
167 7698 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
168 7698 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
169 :
170 7698 : *foundPtr = false;
171 : }
172 :
173 2124928 : return bufHdr;
174 : }
175 :
176 : static Buffer
177 31872 : GetLocalVictimBuffer(void)
178 : {
179 : int victim_bufid;
180 : int trycounter;
181 : uint32 buf_state;
182 : BufferDesc *bufHdr;
183 :
184 31872 : ResourceOwnerEnlarge(CurrentResourceOwner);
185 :
186 : /*
187 : * Need to get a new buffer. We use a clock sweep algorithm (essentially
188 : * the same as what freelist.c does now...)
189 : */
190 31872 : trycounter = NLocBuffer;
191 : for (;;)
192 : {
193 38286 : victim_bufid = nextFreeLocalBufId;
194 :
195 38286 : if (++nextFreeLocalBufId >= NLocBuffer)
196 66 : nextFreeLocalBufId = 0;
197 :
198 38286 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
199 :
200 38286 : if (LocalRefCount[victim_bufid] == 0)
201 : {
202 38172 : buf_state = pg_atomic_read_u32(&bufHdr->state);
203 :
204 38172 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
205 : {
206 6300 : buf_state -= BUF_USAGECOUNT_ONE;
207 6300 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
208 6300 : trycounter = NLocBuffer;
209 : }
210 : else
211 : {
212 : /* Found a usable buffer */
213 31872 : PinLocalBuffer(bufHdr, false);
214 31872 : break;
215 : }
216 : }
217 114 : else if (--trycounter == 0)
218 0 : ereport(ERROR,
219 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
220 : errmsg("no empty local buffer available")));
221 : }
222 :
223 : /*
224 : * lazy memory allocation: allocate space on first use of a buffer.
225 : */
226 31872 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
227 : {
228 : /* Set pointer for use by BufferGetBlock() macro */
229 29382 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
230 : }
231 :
232 : /*
233 : * this buffer is not referenced but it might still be dirty. if that's
234 : * the case, write it out before reusing it!
235 : */
236 31872 : if (buf_state & BM_DIRTY)
237 : {
238 : instr_time io_start;
239 : SMgrRelation oreln;
240 894 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
241 :
242 : /* Find smgr relation for buffer */
243 894 : oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyProcNumber);
244 :
245 894 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
246 :
247 894 : io_start = pgstat_prepare_io_time(track_io_timing);
248 :
249 : /* And write... */
250 894 : smgrwrite(oreln,
251 894 : BufTagGetForkNum(&bufHdr->tag),
252 : bufHdr->tag.blockNum,
253 : localpage,
254 : false);
255 :
256 : /* Temporary table I/O does not use Buffer Access Strategies */
257 894 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
258 : IOOP_WRITE, io_start, 1, BLCKSZ);
259 :
260 : /* Mark not-dirty now in case we error out below */
261 894 : buf_state &= ~BM_DIRTY;
262 894 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
263 :
264 894 : pgBufferUsage.local_blks_written++;
265 : }
266 :
267 : /*
268 : * Remove the victim buffer from the hashtable and mark as invalid.
269 : */
270 31872 : if (buf_state & BM_TAG_VALID)
271 : {
272 : LocalBufferLookupEnt *hresult;
273 :
274 : hresult = (LocalBufferLookupEnt *)
275 900 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
276 900 : if (!hresult) /* shouldn't happen */
277 0 : elog(ERROR, "local buffer hash table corrupted");
278 : /* mark buffer invalid just in case hash insert fails */
279 900 : ClearBufferTag(&bufHdr->tag);
280 900 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
281 900 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
282 :
283 900 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
284 : }
285 :
286 31872 : return BufferDescriptorGetBuffer(bufHdr);
287 : }
288 :
289 : /* see LimitAdditionalPins() */
290 : void
291 28720 : LimitAdditionalLocalPins(uint32 *additional_pins)
292 : {
293 : uint32 max_pins;
294 :
295 28720 : if (*additional_pins <= 1)
296 17072 : return;
297 :
298 : /*
299 : * In contrast to LimitAdditionalPins() other backends don't play a role
300 : * here. We can allow up to NLocBuffer pins in total, but it might not be
301 : * initialized yet so read num_temp_buffers.
302 : */
303 11648 : max_pins = (num_temp_buffers - NLocalPinnedBuffers);
304 :
305 11648 : if (*additional_pins >= max_pins)
306 0 : *additional_pins = max_pins;
307 : }
308 :
309 : /*
310 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
311 : * temporary buffers.
312 : */
313 : BlockNumber
314 17682 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
315 : ForkNumber fork,
316 : uint32 flags,
317 : uint32 extend_by,
318 : BlockNumber extend_upto,
319 : Buffer *buffers,
320 : uint32 *extended_by)
321 : {
322 : BlockNumber first_block;
323 : instr_time io_start;
324 :
325 : /* Initialize local buffers if first request in this session */
326 17682 : if (LocalBufHash == NULL)
327 476 : InitLocalBuffers();
328 :
329 17682 : LimitAdditionalLocalPins(&extend_by);
330 :
331 41856 : for (uint32 i = 0; i < extend_by; i++)
332 : {
333 : BufferDesc *buf_hdr;
334 : Block buf_block;
335 :
336 24174 : buffers[i] = GetLocalVictimBuffer();
337 24174 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
338 24174 : buf_block = LocalBufHdrGetBlock(buf_hdr);
339 :
340 : /* new buffers are zero-filled */
341 24174 : MemSet((char *) buf_block, 0, BLCKSZ);
342 : }
343 :
344 17682 : first_block = smgrnblocks(bmr.smgr, fork);
345 :
346 : if (extend_upto != InvalidBlockNumber)
347 : {
348 : /*
349 : * In contrast to shared relations, nothing could change the relation
350 : * size concurrently. Thus we shouldn't end up finding that we don't
351 : * need to do anything.
352 : */
353 : Assert(first_block <= extend_upto);
354 :
355 : Assert((uint64) first_block + extend_by <= extend_upto);
356 : }
357 :
358 : /* Fail if relation is already at maximum possible length */
359 17682 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
360 0 : ereport(ERROR,
361 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
362 : errmsg("cannot extend relation %s beyond %u blocks",
363 : relpath(bmr.smgr->smgr_rlocator, fork),
364 : MaxBlockNumber)));
365 :
366 41856 : for (uint32 i = 0; i < extend_by; i++)
367 : {
368 : int victim_buf_id;
369 : BufferDesc *victim_buf_hdr;
370 : BufferTag tag;
371 : LocalBufferLookupEnt *hresult;
372 : bool found;
373 :
374 24174 : victim_buf_id = -buffers[i] - 1;
375 24174 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
376 :
377 : /* in case we need to pin an existing buffer below */
378 24174 : ResourceOwnerEnlarge(CurrentResourceOwner);
379 :
380 24174 : InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
381 :
382 : hresult = (LocalBufferLookupEnt *)
383 24174 : hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
384 24174 : if (found)
385 : {
386 : BufferDesc *existing_hdr;
387 : uint32 buf_state;
388 :
389 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
390 :
391 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
392 0 : PinLocalBuffer(existing_hdr, false);
393 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
394 :
395 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
396 : Assert(buf_state & BM_TAG_VALID);
397 : Assert(!(buf_state & BM_DIRTY));
398 0 : buf_state &= ~BM_VALID;
399 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
400 : }
401 : else
402 : {
403 24174 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
404 :
405 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
406 :
407 24174 : victim_buf_hdr->tag = tag;
408 :
409 24174 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
410 :
411 24174 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
412 :
413 24174 : hresult->id = victim_buf_id;
414 : }
415 : }
416 :
417 17682 : io_start = pgstat_prepare_io_time(track_io_timing);
418 :
419 : /* actually extend relation */
420 17682 : smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
421 :
422 17682 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
423 17682 : io_start, 1, extend_by * BLCKSZ);
424 :
425 41856 : for (uint32 i = 0; i < extend_by; i++)
426 : {
427 24174 : Buffer buf = buffers[i];
428 : BufferDesc *buf_hdr;
429 : uint32 buf_state;
430 :
431 24174 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
432 :
433 24174 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
434 24174 : buf_state |= BM_VALID;
435 24174 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
436 : }
437 :
438 17682 : *extended_by = extend_by;
439 :
440 17682 : pgBufferUsage.local_blks_written += extend_by;
441 :
442 17682 : return first_block;
443 : }
444 :
445 : /*
446 : * MarkLocalBufferDirty -
447 : * mark a local buffer dirty
448 : */
449 : void
450 3254734 : MarkLocalBufferDirty(Buffer buffer)
451 : {
452 : int bufid;
453 : BufferDesc *bufHdr;
454 : uint32 buf_state;
455 :
456 : Assert(BufferIsLocal(buffer));
457 :
458 : #ifdef LBDEBUG
459 : fprintf(stderr, "LB DIRTY %d\n", buffer);
460 : #endif
461 :
462 3254734 : bufid = -buffer - 1;
463 :
464 : Assert(LocalRefCount[bufid] > 0);
465 :
466 3254734 : bufHdr = GetLocalBufferDescriptor(bufid);
467 :
468 3254734 : buf_state = pg_atomic_read_u32(&bufHdr->state);
469 :
470 3254734 : if (!(buf_state & BM_DIRTY))
471 22400 : pgBufferUsage.local_blks_dirtied++;
472 :
473 3254734 : buf_state |= BM_DIRTY;
474 :
475 3254734 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
476 3254734 : }
477 :
478 : /*
479 : * DropRelationLocalBuffers
480 : * This function removes from the buffer pool all the pages of the
481 : * specified relation that have block numbers >= firstDelBlock.
482 : * (In particular, with firstDelBlock = 0, all pages are removed.)
483 : * Dirty pages are simply dropped, without bothering to write them
484 : * out first. Therefore, this is NOT rollback-able, and so should be
485 : * used only with extreme caution!
486 : *
487 : * See DropRelationBuffers in bufmgr.c for more notes.
488 : */
489 : void
490 692 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
491 : BlockNumber firstDelBlock)
492 : {
493 : int i;
494 :
495 645812 : for (i = 0; i < NLocBuffer; i++)
496 : {
497 645120 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
498 : LocalBufferLookupEnt *hresult;
499 : uint32 buf_state;
500 :
501 645120 : buf_state = pg_atomic_read_u32(&bufHdr->state);
502 :
503 705420 : if ((buf_state & BM_TAG_VALID) &&
504 62070 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
505 1770 : BufTagGetForkNum(&bufHdr->tag) == forkNum &&
506 1636 : bufHdr->tag.blockNum >= firstDelBlock)
507 : {
508 1584 : if (LocalRefCount[i] != 0)
509 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
510 : bufHdr->tag.blockNum,
511 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
512 : MyProcNumber,
513 : BufTagGetForkNum(&bufHdr->tag)),
514 : LocalRefCount[i]);
515 :
516 : /* Remove entry from hashtable */
517 : hresult = (LocalBufferLookupEnt *)
518 1584 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
519 1584 : if (!hresult) /* shouldn't happen */
520 0 : elog(ERROR, "local buffer hash table corrupted");
521 : /* Mark buffer invalid */
522 1584 : ClearBufferTag(&bufHdr->tag);
523 1584 : buf_state &= ~BUF_FLAG_MASK;
524 1584 : buf_state &= ~BUF_USAGECOUNT_MASK;
525 1584 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
526 : }
527 : }
528 692 : }
529 :
530 : /*
531 : * DropRelationAllLocalBuffers
532 : * This function removes from the buffer pool all pages of all forks
533 : * of the specified relation.
534 : *
535 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
536 : */
537 : void
538 6064 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
539 : {
540 : int i;
541 :
542 5883328 : for (i = 0; i < NLocBuffer; i++)
543 : {
544 5877264 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
545 : LocalBufferLookupEnt *hresult;
546 : uint32 buf_state;
547 :
548 5877264 : buf_state = pg_atomic_read_u32(&bufHdr->state);
549 :
550 6297048 : if ((buf_state & BM_TAG_VALID) &&
551 419784 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
552 : {
553 29388 : if (LocalRefCount[i] != 0)
554 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
555 : bufHdr->tag.blockNum,
556 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
557 : MyProcNumber,
558 : BufTagGetForkNum(&bufHdr->tag)),
559 : LocalRefCount[i]);
560 : /* Remove entry from hashtable */
561 : hresult = (LocalBufferLookupEnt *)
562 29388 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
563 29388 : if (!hresult) /* shouldn't happen */
564 0 : elog(ERROR, "local buffer hash table corrupted");
565 : /* Mark buffer invalid */
566 29388 : ClearBufferTag(&bufHdr->tag);
567 29388 : buf_state &= ~BUF_FLAG_MASK;
568 29388 : buf_state &= ~BUF_USAGECOUNT_MASK;
569 29388 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
570 : }
571 : }
572 6064 : }
573 :
574 : /*
575 : * InitLocalBuffers -
576 : * init the local buffer cache. Since most queries (esp. multi-user ones)
577 : * don't involve local buffers, we delay allocating actual memory for the
578 : * buffers until we need them; just make the buffer headers here.
579 : */
580 : static void
581 502 : InitLocalBuffers(void)
582 : {
583 502 : int nbufs = num_temp_buffers;
584 : HASHCTL info;
585 : int i;
586 :
587 : /*
588 : * Parallel workers can't access data in temporary tables, because they
589 : * have no visibility into the local buffers of their leader. This is a
590 : * convenient, low-cost place to provide a backstop check for that. Note
591 : * that we don't wish to prevent a parallel worker from accessing catalog
592 : * metadata about a temp table, so checks at higher levels would be
593 : * inappropriate.
594 : */
595 502 : if (IsParallelWorker())
596 0 : ereport(ERROR,
597 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
598 : errmsg("cannot access temporary tables during a parallel operation")));
599 :
600 : /* Allocate and zero buffer headers and auxiliary arrays */
601 502 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
602 502 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
603 502 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
604 502 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
605 0 : ereport(FATAL,
606 : (errcode(ERRCODE_OUT_OF_MEMORY),
607 : errmsg("out of memory")));
608 :
609 502 : nextFreeLocalBufId = 0;
610 :
611 : /* initialize fields that need to start off nonzero */
612 509006 : for (i = 0; i < nbufs; i++)
613 : {
614 508504 : BufferDesc *buf = GetLocalBufferDescriptor(i);
615 :
616 : /*
617 : * negative to indicate local buffer. This is tricky: shared buffers
618 : * start with 0. We have to start with -2. (Note that the routine
619 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
620 : * is -1.)
621 : */
622 508504 : buf->buf_id = -i - 2;
623 :
624 : /*
625 : * Intentionally do not initialize the buffer's atomic variable
626 : * (besides zeroing the underlying memory above). That way we get
627 : * errors on platforms without atomics, if somebody (re-)introduces
628 : * atomic operations for local buffers.
629 : */
630 : }
631 :
632 : /* Create the lookup hash table */
633 502 : info.keysize = sizeof(BufferTag);
634 502 : info.entrysize = sizeof(LocalBufferLookupEnt);
635 :
636 502 : LocalBufHash = hash_create("Local Buffer Lookup Table",
637 : nbufs,
638 : &info,
639 : HASH_ELEM | HASH_BLOBS);
640 :
641 502 : if (!LocalBufHash)
642 0 : elog(ERROR, "could not initialize local buffer hash table");
643 :
644 : /* Initialization done, mark buffers allocated */
645 502 : NLocBuffer = nbufs;
646 502 : }
647 :
648 : /*
649 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
650 : * that does not support adjusting the usagecount - but so far it does not
651 : * seem worth the trouble.
652 : *
653 : * Note that ResourceOwnerEnlarge() must have been done already.
654 : */
655 : bool
656 2149102 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
657 : {
658 : uint32 buf_state;
659 2149102 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
660 2149102 : int bufid = -buffer - 1;
661 :
662 2149102 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
663 :
664 2149102 : if (LocalRefCount[bufid] == 0)
665 : {
666 1978958 : NLocalPinnedBuffers++;
667 1978958 : if (adjust_usagecount &&
668 1947086 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
669 : {
670 106356 : buf_state += BUF_USAGECOUNT_ONE;
671 106356 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
672 : }
673 : }
674 2149102 : LocalRefCount[bufid]++;
675 2149102 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
676 : BufferDescriptorGetBuffer(buf_hdr));
677 :
678 2149102 : return buf_state & BM_VALID;
679 : }
680 :
681 : void
682 2846290 : UnpinLocalBuffer(Buffer buffer)
683 : {
684 2846290 : UnpinLocalBufferNoOwner(buffer);
685 2846290 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
686 2846290 : }
687 :
688 : void
689 2847056 : UnpinLocalBufferNoOwner(Buffer buffer)
690 : {
691 2847056 : int buffid = -buffer - 1;
692 :
693 : Assert(BufferIsLocal(buffer));
694 : Assert(LocalRefCount[buffid] > 0);
695 : Assert(NLocalPinnedBuffers > 0);
696 :
697 2847056 : if (--LocalRefCount[buffid] == 0)
698 1978958 : NLocalPinnedBuffers--;
699 2847056 : }
700 :
701 : /*
702 : * GUC check_hook for temp_buffers
703 : */
704 : bool
705 1990 : check_temp_buffers(int *newval, void **extra, GucSource source)
706 : {
707 : /*
708 : * Once local buffers have been initialized, it's too late to change this.
709 : * However, if this is only a test call, allow it.
710 : */
711 1990 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
712 : {
713 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
714 0 : return false;
715 : }
716 1990 : return true;
717 : }
718 :
719 : /*
720 : * GetLocalBufferStorage - allocate memory for a local buffer
721 : *
722 : * The idea of this function is to aggregate our requests for storage
723 : * so that the memory manager doesn't see a whole lot of relatively small
724 : * requests. Since we'll never give back a local buffer once it's created
725 : * within a particular process, no point in burdening memmgr with separately
726 : * managed chunks.
727 : */
728 : static Block
729 29382 : GetLocalBufferStorage(void)
730 : {
731 : static char *cur_block = NULL;
732 : static int next_buf_in_block = 0;
733 : static int num_bufs_in_block = 0;
734 : static int total_bufs_allocated = 0;
735 : static MemoryContext LocalBufferContext = NULL;
736 :
737 : char *this_buf;
738 :
739 : Assert(total_bufs_allocated < NLocBuffer);
740 :
741 29382 : if (next_buf_in_block >= num_bufs_in_block)
742 : {
743 : /* Need to make a new request to memmgr */
744 : int num_bufs;
745 :
746 : /*
747 : * We allocate local buffers in a context of their own, so that the
748 : * space eaten for them is easily recognizable in MemoryContextStats
749 : * output. Create the context on first use.
750 : */
751 778 : if (LocalBufferContext == NULL)
752 502 : LocalBufferContext =
753 502 : AllocSetContextCreate(TopMemoryContext,
754 : "LocalBufferContext",
755 : ALLOCSET_DEFAULT_SIZES);
756 :
757 : /* Start with a 16-buffer request; subsequent ones double each time */
758 778 : num_bufs = Max(num_bufs_in_block * 2, 16);
759 : /* But not more than what we need for all remaining local bufs */
760 778 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
761 : /* And don't overflow MaxAllocSize, either */
762 778 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
763 :
764 : /* Buffers should be I/O aligned. */
765 778 : cur_block = (char *)
766 778 : TYPEALIGN(PG_IO_ALIGN_SIZE,
767 : MemoryContextAlloc(LocalBufferContext,
768 : num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
769 778 : next_buf_in_block = 0;
770 778 : num_bufs_in_block = num_bufs;
771 : }
772 :
773 : /* Allocate next buffer in current memory block */
774 29382 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
775 29382 : next_buf_in_block++;
776 29382 : total_bufs_allocated++;
777 :
778 29382 : return (Block) this_buf;
779 : }
780 :
781 : /*
782 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
783 : *
784 : * This is just like CheckForBufferLeaks(), but for local buffers.
785 : */
786 : static void
787 828396 : CheckForLocalBufferLeaks(void)
788 : {
789 : #ifdef USE_ASSERT_CHECKING
790 : if (LocalRefCount)
791 : {
792 : int RefCountErrors = 0;
793 : int i;
794 :
795 : for (i = 0; i < NLocBuffer; i++)
796 : {
797 : if (LocalRefCount[i] != 0)
798 : {
799 : Buffer b = -i - 1;
800 : char *s;
801 :
802 : s = DebugPrintBufferRefcount(b);
803 : elog(WARNING, "local buffer refcount leak: %s", s);
804 : pfree(s);
805 :
806 : RefCountErrors++;
807 : }
808 : }
809 : Assert(RefCountErrors == 0);
810 : }
811 : #endif
812 828396 : }
813 :
814 : /*
815 : * AtEOXact_LocalBuffers - clean up at end of transaction.
816 : *
817 : * This is just like AtEOXact_Buffers, but for local buffers.
818 : */
819 : void
820 791110 : AtEOXact_LocalBuffers(bool isCommit)
821 : {
822 791110 : CheckForLocalBufferLeaks();
823 791110 : }
824 :
825 : /*
826 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
827 : *
828 : * This is just like AtProcExit_Buffers, but for local buffers.
829 : */
830 : void
831 37286 : AtProcExit_LocalBuffers(void)
832 : {
833 : /*
834 : * We shouldn't be holding any remaining pins; if we are, and assertions
835 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
836 : * drop the temp rels.
837 : */
838 37286 : CheckForLocalBufferLeaks();
839 37286 : }
|