Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "executor/instrument.h"
20 : #include "pgstat.h"
21 : #include "storage/buf_internals.h"
22 : #include "storage/bufmgr.h"
23 : #include "storage/fd.h"
24 : #include "utils/guc_hooks.h"
25 : #include "utils/memutils.h"
26 : #include "utils/resowner.h"
27 :
28 :
29 : /*#define LBDEBUG*/
30 :
31 : /* entry for buffer lookup hashtable */
32 : typedef struct
33 : {
34 : BufferTag key; /* Tag of a disk page */
35 : int id; /* Associated local buffer's index */
36 : } LocalBufferLookupEnt;
37 :
38 : /* Note: this macro only works on local buffers, not shared ones! */
39 : #define LocalBufHdrGetBlock(bufHdr) \
40 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
41 :
42 : int NLocBuffer = 0; /* until buffers are initialized */
43 :
44 : BufferDesc *LocalBufferDescriptors = NULL;
45 : Block *LocalBufferBlockPointers = NULL;
46 : int32 *LocalRefCount = NULL;
47 :
48 : static int nextFreeLocalBufId = 0;
49 :
50 : static HTAB *LocalBufHash = NULL;
51 :
52 : /* number of local buffers pinned at least once */
53 : static int NLocalPinnedBuffers = 0;
54 :
55 :
56 : static void InitLocalBuffers(void);
57 : static Block GetLocalBufferStorage(void);
58 : static Buffer GetLocalVictimBuffer(void);
59 :
60 :
61 : /*
62 : * PrefetchLocalBuffer -
63 : * initiate asynchronous read of a block of a relation
64 : *
65 : * Do PrefetchBuffer's work for temporary relations.
66 : * No-op if prefetching isn't compiled in.
67 : */
68 : PrefetchBufferResult
69 6224 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
70 : BlockNumber blockNum)
71 : {
72 6224 : PrefetchBufferResult result = {InvalidBuffer, false};
73 : BufferTag newTag; /* identity of requested block */
74 : LocalBufferLookupEnt *hresult;
75 :
76 6224 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
77 :
78 : /* Initialize local buffers if first request in this session */
79 6224 : if (LocalBufHash == NULL)
80 0 : InitLocalBuffers();
81 :
82 : /* See if the desired buffer already exists */
83 : hresult = (LocalBufferLookupEnt *)
84 6224 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
85 :
86 6224 : if (hresult)
87 : {
88 : /* Yes, so nothing to do */
89 6224 : result.recent_buffer = -hresult->id - 1;
90 : }
91 : else
92 : {
93 : #ifdef USE_PREFETCH
94 : /* Not in buffers, so initiate prefetch */
95 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
96 0 : smgrprefetch(smgr, forkNum, blockNum, 1))
97 : {
98 0 : result.initiated_io = true;
99 : }
100 : #endif /* USE_PREFETCH */
101 : }
102 :
103 6224 : return result;
104 : }
105 :
106 :
107 : /*
108 : * LocalBufferAlloc -
109 : * Find or create a local buffer for the given page of the given relation.
110 : *
111 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
112 : * any locking since this is all local. We support only default access
113 : * strategy (hence, usage_count is always advanced).
114 : */
115 : BufferDesc *
116 2124442 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
117 : bool *foundPtr)
118 : {
119 : BufferTag newTag; /* identity of requested block */
120 : LocalBufferLookupEnt *hresult;
121 : BufferDesc *bufHdr;
122 : Buffer victim_buffer;
123 : int bufid;
124 : bool found;
125 :
126 2124442 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
127 :
128 : /* Initialize local buffers if first request in this session */
129 2124442 : if (LocalBufHash == NULL)
130 26 : InitLocalBuffers();
131 :
132 2124442 : ResourceOwnerEnlarge(CurrentResourceOwner);
133 :
134 : /* See if the desired buffer already exists */
135 : hresult = (LocalBufferLookupEnt *)
136 2124442 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
137 :
138 2124442 : if (hresult)
139 : {
140 2116768 : bufid = hresult->id;
141 2116768 : bufHdr = GetLocalBufferDescriptor(bufid);
142 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
143 :
144 2116768 : *foundPtr = PinLocalBuffer(bufHdr, true);
145 : }
146 : else
147 : {
148 : uint32 buf_state;
149 :
150 7674 : victim_buffer = GetLocalVictimBuffer();
151 7674 : bufid = -victim_buffer - 1;
152 7674 : bufHdr = GetLocalBufferDescriptor(bufid);
153 :
154 : hresult = (LocalBufferLookupEnt *)
155 7674 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
156 7674 : if (found) /* shouldn't happen */
157 0 : elog(ERROR, "local buffer hash table corrupted");
158 7674 : hresult->id = bufid;
159 :
160 : /*
161 : * it's all ours now.
162 : */
163 7674 : bufHdr->tag = newTag;
164 :
165 7674 : buf_state = pg_atomic_read_u32(&bufHdr->state);
166 7674 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
167 7674 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
168 7674 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
169 :
170 7674 : *foundPtr = false;
171 : }
172 :
173 2124442 : return bufHdr;
174 : }
175 :
176 : static Buffer
177 31844 : GetLocalVictimBuffer(void)
178 : {
179 : int victim_bufid;
180 : int trycounter;
181 : uint32 buf_state;
182 : BufferDesc *bufHdr;
183 :
184 31844 : ResourceOwnerEnlarge(CurrentResourceOwner);
185 :
186 : /*
187 : * Need to get a new buffer. We use a clock sweep algorithm (essentially
188 : * the same as what freelist.c does now...)
189 : */
190 31844 : trycounter = NLocBuffer;
191 : for (;;)
192 : {
193 38258 : victim_bufid = nextFreeLocalBufId;
194 :
195 38258 : if (++nextFreeLocalBufId >= NLocBuffer)
196 66 : nextFreeLocalBufId = 0;
197 :
198 38258 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
199 :
200 38258 : if (LocalRefCount[victim_bufid] == 0)
201 : {
202 38144 : buf_state = pg_atomic_read_u32(&bufHdr->state);
203 :
204 38144 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
205 : {
206 6300 : buf_state -= BUF_USAGECOUNT_ONE;
207 6300 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
208 6300 : trycounter = NLocBuffer;
209 : }
210 : else
211 : {
212 : /* Found a usable buffer */
213 31844 : PinLocalBuffer(bufHdr, false);
214 31844 : break;
215 : }
216 : }
217 114 : else if (--trycounter == 0)
218 0 : ereport(ERROR,
219 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
220 : errmsg("no empty local buffer available")));
221 : }
222 :
223 : /*
224 : * lazy memory allocation: allocate space on first use of a buffer.
225 : */
226 31844 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
227 : {
228 : /* Set pointer for use by BufferGetBlock() macro */
229 29354 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
230 : }
231 :
232 : /*
233 : * this buffer is not referenced but it might still be dirty. if that's
234 : * the case, write it out before reusing it!
235 : */
236 31844 : if (buf_state & BM_DIRTY)
237 : {
238 : instr_time io_start;
239 : SMgrRelation oreln;
240 894 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
241 :
242 : /* Find smgr relation for buffer */
243 894 : oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyProcNumber);
244 :
245 894 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
246 :
247 894 : io_start = pgstat_prepare_io_time(track_io_timing);
248 :
249 : /* And write... */
250 894 : smgrwrite(oreln,
251 894 : BufTagGetForkNum(&bufHdr->tag),
252 : bufHdr->tag.blockNum,
253 : localpage,
254 : false);
255 :
256 : /* Temporary table I/O does not use Buffer Access Strategies */
257 894 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
258 : IOOP_WRITE, io_start, 1);
259 :
260 : /* Mark not-dirty now in case we error out below */
261 894 : buf_state &= ~BM_DIRTY;
262 894 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
263 :
264 894 : pgBufferUsage.local_blks_written++;
265 : }
266 :
267 : /*
268 : * Remove the victim buffer from the hashtable and mark as invalid.
269 : */
270 31844 : if (buf_state & BM_TAG_VALID)
271 : {
272 : LocalBufferLookupEnt *hresult;
273 :
274 : hresult = (LocalBufferLookupEnt *)
275 900 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
276 900 : if (!hresult) /* shouldn't happen */
277 0 : elog(ERROR, "local buffer hash table corrupted");
278 : /* mark buffer invalid just in case hash insert fails */
279 900 : ClearBufferTag(&bufHdr->tag);
280 900 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
281 900 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
282 900 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT);
283 : }
284 :
285 31844 : return BufferDescriptorGetBuffer(bufHdr);
286 : }
287 :
288 : /* see LimitAdditionalPins() */
289 : void
290 28578 : LimitAdditionalLocalPins(uint32 *additional_pins)
291 : {
292 : uint32 max_pins;
293 :
294 28578 : if (*additional_pins <= 1)
295 17068 : return;
296 :
297 : /*
298 : * In contrast to LimitAdditionalPins() other backends don't play a role
299 : * here. We can allow up to NLocBuffer pins in total, but it might not be
300 : * initialized yet so read num_temp_buffers.
301 : */
302 11510 : max_pins = (num_temp_buffers - NLocalPinnedBuffers);
303 :
304 11510 : if (*additional_pins >= max_pins)
305 0 : *additional_pins = max_pins;
306 : }
307 :
308 : /*
309 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
310 : * temporary buffers.
311 : */
312 : BlockNumber
313 17678 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
314 : ForkNumber fork,
315 : uint32 flags,
316 : uint32 extend_by,
317 : BlockNumber extend_upto,
318 : Buffer *buffers,
319 : uint32 *extended_by)
320 : {
321 : BlockNumber first_block;
322 : instr_time io_start;
323 :
324 : /* Initialize local buffers if first request in this session */
325 17678 : if (LocalBufHash == NULL)
326 476 : InitLocalBuffers();
327 :
328 17678 : LimitAdditionalLocalPins(&extend_by);
329 :
330 41848 : for (uint32 i = 0; i < extend_by; i++)
331 : {
332 : BufferDesc *buf_hdr;
333 : Block buf_block;
334 :
335 24170 : buffers[i] = GetLocalVictimBuffer();
336 24170 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
337 24170 : buf_block = LocalBufHdrGetBlock(buf_hdr);
338 :
339 : /* new buffers are zero-filled */
340 24170 : MemSet((char *) buf_block, 0, BLCKSZ);
341 : }
342 :
343 17678 : first_block = smgrnblocks(bmr.smgr, fork);
344 :
345 : if (extend_upto != InvalidBlockNumber)
346 : {
347 : /*
348 : * In contrast to shared relations, nothing could change the relation
349 : * size concurrently. Thus we shouldn't end up finding that we don't
350 : * need to do anything.
351 : */
352 : Assert(first_block <= extend_upto);
353 :
354 : Assert((uint64) first_block + extend_by <= extend_upto);
355 : }
356 :
357 : /* Fail if relation is already at maximum possible length */
358 17678 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
359 0 : ereport(ERROR,
360 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
361 : errmsg("cannot extend relation %s beyond %u blocks",
362 : relpath(bmr.smgr->smgr_rlocator, fork),
363 : MaxBlockNumber)));
364 :
365 41848 : for (uint32 i = 0; i < extend_by; i++)
366 : {
367 : int victim_buf_id;
368 : BufferDesc *victim_buf_hdr;
369 : BufferTag tag;
370 : LocalBufferLookupEnt *hresult;
371 : bool found;
372 :
373 24170 : victim_buf_id = -buffers[i] - 1;
374 24170 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
375 :
376 : /* in case we need to pin an existing buffer below */
377 24170 : ResourceOwnerEnlarge(CurrentResourceOwner);
378 :
379 24170 : InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
380 :
381 : hresult = (LocalBufferLookupEnt *)
382 24170 : hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
383 24170 : if (found)
384 : {
385 : BufferDesc *existing_hdr;
386 : uint32 buf_state;
387 :
388 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
389 :
390 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
391 0 : PinLocalBuffer(existing_hdr, false);
392 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
393 :
394 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
395 : Assert(buf_state & BM_TAG_VALID);
396 : Assert(!(buf_state & BM_DIRTY));
397 0 : buf_state &= ~BM_VALID;
398 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
399 : }
400 : else
401 : {
402 24170 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
403 :
404 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
405 :
406 24170 : victim_buf_hdr->tag = tag;
407 :
408 24170 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
409 :
410 24170 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
411 :
412 24170 : hresult->id = victim_buf_id;
413 : }
414 : }
415 :
416 17678 : io_start = pgstat_prepare_io_time(track_io_timing);
417 :
418 : /* actually extend relation */
419 17678 : smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
420 :
421 17678 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
422 : io_start, extend_by);
423 :
424 41848 : for (uint32 i = 0; i < extend_by; i++)
425 : {
426 24170 : Buffer buf = buffers[i];
427 : BufferDesc *buf_hdr;
428 : uint32 buf_state;
429 :
430 24170 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
431 :
432 24170 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
433 24170 : buf_state |= BM_VALID;
434 24170 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
435 : }
436 :
437 17678 : *extended_by = extend_by;
438 :
439 17678 : pgBufferUsage.local_blks_written += extend_by;
440 :
441 17678 : return first_block;
442 : }
443 :
444 : /*
445 : * MarkLocalBufferDirty -
446 : * mark a local buffer dirty
447 : */
448 : void
449 3254444 : MarkLocalBufferDirty(Buffer buffer)
450 : {
451 : int bufid;
452 : BufferDesc *bufHdr;
453 : uint32 buf_state;
454 :
455 : Assert(BufferIsLocal(buffer));
456 :
457 : #ifdef LBDEBUG
458 : fprintf(stderr, "LB DIRTY %d\n", buffer);
459 : #endif
460 :
461 3254444 : bufid = -buffer - 1;
462 :
463 : Assert(LocalRefCount[bufid] > 0);
464 :
465 3254444 : bufHdr = GetLocalBufferDescriptor(bufid);
466 :
467 3254444 : buf_state = pg_atomic_read_u32(&bufHdr->state);
468 :
469 3254444 : if (!(buf_state & BM_DIRTY))
470 22390 : pgBufferUsage.local_blks_dirtied++;
471 :
472 3254444 : buf_state |= BM_DIRTY;
473 :
474 3254444 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
475 3254444 : }
476 :
477 : /*
478 : * DropRelationLocalBuffers
479 : * This function removes from the buffer pool all the pages of the
480 : * specified relation that have block numbers >= firstDelBlock.
481 : * (In particular, with firstDelBlock = 0, all pages are removed.)
482 : * Dirty pages are simply dropped, without bothering to write them
483 : * out first. Therefore, this is NOT rollback-able, and so should be
484 : * used only with extreme caution!
485 : *
486 : * See DropRelationBuffers in bufmgr.c for more notes.
487 : */
488 : void
489 692 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
490 : BlockNumber firstDelBlock)
491 : {
492 : int i;
493 :
494 645812 : for (i = 0; i < NLocBuffer; i++)
495 : {
496 645120 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
497 : LocalBufferLookupEnt *hresult;
498 : uint32 buf_state;
499 :
500 645120 : buf_state = pg_atomic_read_u32(&bufHdr->state);
501 :
502 705420 : if ((buf_state & BM_TAG_VALID) &&
503 62070 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
504 1770 : BufTagGetForkNum(&bufHdr->tag) == forkNum &&
505 1636 : bufHdr->tag.blockNum >= firstDelBlock)
506 : {
507 1584 : if (LocalRefCount[i] != 0)
508 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
509 : bufHdr->tag.blockNum,
510 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
511 : MyProcNumber,
512 : BufTagGetForkNum(&bufHdr->tag)),
513 : LocalRefCount[i]);
514 :
515 : /* Remove entry from hashtable */
516 : hresult = (LocalBufferLookupEnt *)
517 1584 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
518 1584 : if (!hresult) /* shouldn't happen */
519 0 : elog(ERROR, "local buffer hash table corrupted");
520 : /* Mark buffer invalid */
521 1584 : ClearBufferTag(&bufHdr->tag);
522 1584 : buf_state &= ~BUF_FLAG_MASK;
523 1584 : buf_state &= ~BUF_USAGECOUNT_MASK;
524 1584 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
525 : }
526 : }
527 692 : }
528 :
529 : /*
530 : * DropRelationAllLocalBuffers
531 : * This function removes from the buffer pool all pages of all forks
532 : * of the specified relation.
533 : *
534 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
535 : */
536 : void
537 6022 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
538 : {
539 : int i;
540 :
541 5840278 : for (i = 0; i < NLocBuffer; i++)
542 : {
543 5834256 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
544 : LocalBufferLookupEnt *hresult;
545 : uint32 buf_state;
546 :
547 5834256 : buf_state = pg_atomic_read_u32(&bufHdr->state);
548 :
549 6252982 : if ((buf_state & BM_TAG_VALID) &&
550 418726 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
551 : {
552 29360 : if (LocalRefCount[i] != 0)
553 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
554 : bufHdr->tag.blockNum,
555 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
556 : MyProcNumber,
557 : BufTagGetForkNum(&bufHdr->tag)),
558 : LocalRefCount[i]);
559 : /* Remove entry from hashtable */
560 : hresult = (LocalBufferLookupEnt *)
561 29360 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
562 29360 : if (!hresult) /* shouldn't happen */
563 0 : elog(ERROR, "local buffer hash table corrupted");
564 : /* Mark buffer invalid */
565 29360 : ClearBufferTag(&bufHdr->tag);
566 29360 : buf_state &= ~BUF_FLAG_MASK;
567 29360 : buf_state &= ~BUF_USAGECOUNT_MASK;
568 29360 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
569 : }
570 : }
571 6022 : }
572 :
573 : /*
574 : * InitLocalBuffers -
575 : * init the local buffer cache. Since most queries (esp. multi-user ones)
576 : * don't involve local buffers, we delay allocating actual memory for the
577 : * buffers until we need them; just make the buffer headers here.
578 : */
579 : static void
580 502 : InitLocalBuffers(void)
581 : {
582 502 : int nbufs = num_temp_buffers;
583 : HASHCTL info;
584 : int i;
585 :
586 : /*
587 : * Parallel workers can't access data in temporary tables, because they
588 : * have no visibility into the local buffers of their leader. This is a
589 : * convenient, low-cost place to provide a backstop check for that. Note
590 : * that we don't wish to prevent a parallel worker from accessing catalog
591 : * metadata about a temp table, so checks at higher levels would be
592 : * inappropriate.
593 : */
594 502 : if (IsParallelWorker())
595 0 : ereport(ERROR,
596 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
597 : errmsg("cannot access temporary tables during a parallel operation")));
598 :
599 : /* Allocate and zero buffer headers and auxiliary arrays */
600 502 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
601 502 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
602 502 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
603 502 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
604 0 : ereport(FATAL,
605 : (errcode(ERRCODE_OUT_OF_MEMORY),
606 : errmsg("out of memory")));
607 :
608 502 : nextFreeLocalBufId = 0;
609 :
610 : /* initialize fields that need to start off nonzero */
611 509006 : for (i = 0; i < nbufs; i++)
612 : {
613 508504 : BufferDesc *buf = GetLocalBufferDescriptor(i);
614 :
615 : /*
616 : * negative to indicate local buffer. This is tricky: shared buffers
617 : * start with 0. We have to start with -2. (Note that the routine
618 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
619 : * is -1.)
620 : */
621 508504 : buf->buf_id = -i - 2;
622 :
623 : /*
624 : * Intentionally do not initialize the buffer's atomic variable
625 : * (besides zeroing the underlying memory above). That way we get
626 : * errors on platforms without atomics, if somebody (re-)introduces
627 : * atomic operations for local buffers.
628 : */
629 : }
630 :
631 : /* Create the lookup hash table */
632 502 : info.keysize = sizeof(BufferTag);
633 502 : info.entrysize = sizeof(LocalBufferLookupEnt);
634 :
635 502 : LocalBufHash = hash_create("Local Buffer Lookup Table",
636 : nbufs,
637 : &info,
638 : HASH_ELEM | HASH_BLOBS);
639 :
640 502 : if (!LocalBufHash)
641 0 : elog(ERROR, "could not initialize local buffer hash table");
642 :
643 : /* Initialization done, mark buffers allocated */
644 502 : NLocBuffer = nbufs;
645 502 : }
646 :
647 : /*
648 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
649 : * that does not support adjusting the usagecount - but so far it does not
650 : * seem worth the trouble.
651 : *
652 : * Note that ResourceOwnerEnlarge() must have been done already.
653 : */
654 : bool
655 2148612 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
656 : {
657 : uint32 buf_state;
658 2148612 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
659 2148612 : int bufid = -buffer - 1;
660 :
661 2148612 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
662 :
663 2148612 : if (LocalRefCount[bufid] == 0)
664 : {
665 1978636 : NLocalPinnedBuffers++;
666 1978636 : if (adjust_usagecount &&
667 1946792 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
668 : {
669 106276 : buf_state += BUF_USAGECOUNT_ONE;
670 106276 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
671 : }
672 : }
673 2148612 : LocalRefCount[bufid]++;
674 2148612 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
675 : BufferDescriptorGetBuffer(buf_hdr));
676 :
677 2148612 : return buf_state & BM_VALID;
678 : }
679 :
680 : void
681 2845632 : UnpinLocalBuffer(Buffer buffer)
682 : {
683 2845632 : UnpinLocalBufferNoOwner(buffer);
684 2845632 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
685 2845632 : }
686 :
687 : void
688 2846386 : UnpinLocalBufferNoOwner(Buffer buffer)
689 : {
690 2846386 : int buffid = -buffer - 1;
691 :
692 : Assert(BufferIsLocal(buffer));
693 : Assert(LocalRefCount[buffid] > 0);
694 : Assert(NLocalPinnedBuffers > 0);
695 :
696 2846386 : if (--LocalRefCount[buffid] == 0)
697 1978636 : NLocalPinnedBuffers--;
698 2846386 : }
699 :
700 : /*
701 : * GUC check_hook for temp_buffers
702 : */
703 : bool
704 1980 : check_temp_buffers(int *newval, void **extra, GucSource source)
705 : {
706 : /*
707 : * Once local buffers have been initialized, it's too late to change this.
708 : * However, if this is only a test call, allow it.
709 : */
710 1980 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
711 : {
712 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
713 0 : return false;
714 : }
715 1980 : return true;
716 : }
717 :
718 : /*
719 : * GetLocalBufferStorage - allocate memory for a local buffer
720 : *
721 : * The idea of this function is to aggregate our requests for storage
722 : * so that the memory manager doesn't see a whole lot of relatively small
723 : * requests. Since we'll never give back a local buffer once it's created
724 : * within a particular process, no point in burdening memmgr with separately
725 : * managed chunks.
726 : */
727 : static Block
728 29354 : GetLocalBufferStorage(void)
729 : {
730 : static char *cur_block = NULL;
731 : static int next_buf_in_block = 0;
732 : static int num_bufs_in_block = 0;
733 : static int total_bufs_allocated = 0;
734 : static MemoryContext LocalBufferContext = NULL;
735 :
736 : char *this_buf;
737 :
738 : Assert(total_bufs_allocated < NLocBuffer);
739 :
740 29354 : if (next_buf_in_block >= num_bufs_in_block)
741 : {
742 : /* Need to make a new request to memmgr */
743 : int num_bufs;
744 :
745 : /*
746 : * We allocate local buffers in a context of their own, so that the
747 : * space eaten for them is easily recognizable in MemoryContextStats
748 : * output. Create the context on first use.
749 : */
750 780 : if (LocalBufferContext == NULL)
751 502 : LocalBufferContext =
752 502 : AllocSetContextCreate(TopMemoryContext,
753 : "LocalBufferContext",
754 : ALLOCSET_DEFAULT_SIZES);
755 :
756 : /* Start with a 16-buffer request; subsequent ones double each time */
757 780 : num_bufs = Max(num_bufs_in_block * 2, 16);
758 : /* But not more than what we need for all remaining local bufs */
759 780 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
760 : /* And don't overflow MaxAllocSize, either */
761 780 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
762 :
763 : /* Buffers should be I/O aligned. */
764 780 : cur_block = (char *)
765 780 : TYPEALIGN(PG_IO_ALIGN_SIZE,
766 : MemoryContextAlloc(LocalBufferContext,
767 : num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
768 780 : next_buf_in_block = 0;
769 780 : num_bufs_in_block = num_bufs;
770 : }
771 :
772 : /* Allocate next buffer in current memory block */
773 29354 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
774 29354 : next_buf_in_block++;
775 29354 : total_bufs_allocated++;
776 :
777 29354 : return (Block) this_buf;
778 : }
779 :
780 : /*
781 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
782 : *
783 : * This is just like CheckForBufferLeaks(), but for local buffers.
784 : */
785 : static void
786 772820 : CheckForLocalBufferLeaks(void)
787 : {
788 : #ifdef USE_ASSERT_CHECKING
789 : if (LocalRefCount)
790 : {
791 : int RefCountErrors = 0;
792 : int i;
793 :
794 : for (i = 0; i < NLocBuffer; i++)
795 : {
796 : if (LocalRefCount[i] != 0)
797 : {
798 : Buffer b = -i - 1;
799 : char *s;
800 :
801 : s = DebugPrintBufferRefcount(b);
802 : elog(WARNING, "local buffer refcount leak: %s", s);
803 : pfree(s);
804 :
805 : RefCountErrors++;
806 : }
807 : }
808 : Assert(RefCountErrors == 0);
809 : }
810 : #endif
811 772820 : }
812 :
813 : /*
814 : * AtEOXact_LocalBuffers - clean up at end of transaction.
815 : *
816 : * This is just like AtEOXact_Buffers, but for local buffers.
817 : */
818 : void
819 739554 : AtEOXact_LocalBuffers(bool isCommit)
820 : {
821 739554 : CheckForLocalBufferLeaks();
822 739554 : }
823 :
824 : /*
825 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
826 : *
827 : * This is just like AtProcExit_Buffers, but for local buffers.
828 : */
829 : void
830 33266 : AtProcExit_LocalBuffers(void)
831 : {
832 : /*
833 : * We shouldn't be holding any remaining pins; if we are, and assertions
834 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
835 : * drop the temp rels.
836 : */
837 33266 : CheckForLocalBufferLeaks();
838 33266 : }
|