Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "catalog/catalog.h"
20 : #include "executor/instrument.h"
21 : #include "pgstat.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "utils/guc_hooks.h"
25 : #include "utils/memutils.h"
26 : #include "utils/resowner_private.h"
27 :
28 :
29 : /*#define LBDEBUG*/
30 :
31 : /* entry for buffer lookup hashtable */
32 : typedef struct
33 : {
34 : BufferTag key; /* Tag of a disk page */
35 : int id; /* Associated local buffer's index */
36 : } LocalBufferLookupEnt;
37 :
38 : /* Note: this macro only works on local buffers, not shared ones! */
39 : #define LocalBufHdrGetBlock(bufHdr) \
40 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
41 :
42 : int NLocBuffer = 0; /* until buffers are initialized */
43 :
44 : BufferDesc *LocalBufferDescriptors = NULL;
45 : Block *LocalBufferBlockPointers = NULL;
46 : int32 *LocalRefCount = NULL;
47 :
48 : static int nextFreeLocalBufId = 0;
49 :
50 : static HTAB *LocalBufHash = NULL;
51 :
52 : /* number of local buffers pinned at least once */
53 : static int NLocalPinnedBuffers = 0;
54 :
55 :
56 : static void InitLocalBuffers(void);
57 : static Block GetLocalBufferStorage(void);
58 : static Buffer GetLocalVictimBuffer(void);
59 :
60 :
61 : /*
62 : * PrefetchLocalBuffer -
63 : * initiate asynchronous read of a block of a relation
64 : *
65 : * Do PrefetchBuffer's work for temporary relations.
66 : * No-op if prefetching isn't compiled in.
67 : */
68 : PrefetchBufferResult
69 12488 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
70 : BlockNumber blockNum)
71 : {
72 12488 : PrefetchBufferResult result = {InvalidBuffer, false};
73 : BufferTag newTag; /* identity of requested block */
74 : LocalBufferLookupEnt *hresult;
75 :
76 12488 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
77 :
78 : /* Initialize local buffers if first request in this session */
79 12488 : if (LocalBufHash == NULL)
80 0 : InitLocalBuffers();
81 :
82 : /* See if the desired buffer already exists */
83 : hresult = (LocalBufferLookupEnt *)
84 12488 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
85 :
86 12488 : if (hresult)
87 : {
88 : /* Yes, so nothing to do */
89 12488 : result.recent_buffer = -hresult->id - 1;
90 : }
91 : else
92 : {
93 : #ifdef USE_PREFETCH
94 : /* Not in buffers, so initiate prefetch */
95 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
96 0 : smgrprefetch(smgr, forkNum, blockNum))
97 : {
98 0 : result.initiated_io = true;
99 : }
100 : #endif /* USE_PREFETCH */
101 : }
102 :
103 12488 : return result;
104 : }
105 :
106 :
107 : /*
108 : * LocalBufferAlloc -
109 : * Find or create a local buffer for the given page of the given relation.
110 : *
111 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need
112 : * to do any locking since this is all local. Also, IO_IN_PROGRESS
113 : * does not get set. Lastly, we support only default access strategy
114 : * (hence, usage_count is always advanced).
115 : */
116 : BufferDesc *
117 2092332 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
118 : bool *foundPtr)
119 : {
120 : BufferTag newTag; /* identity of requested block */
121 : LocalBufferLookupEnt *hresult;
122 : BufferDesc *bufHdr;
123 : Buffer victim_buffer;
124 : int bufid;
125 : bool found;
126 :
127 2092332 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
128 :
129 : /* Initialize local buffers if first request in this session */
130 2092332 : if (LocalBufHash == NULL)
131 26 : InitLocalBuffers();
132 :
133 : /* See if the desired buffer already exists */
134 : hresult = (LocalBufferLookupEnt *)
135 2092332 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
136 :
137 2092332 : if (hresult)
138 : {
139 2084754 : bufid = hresult->id;
140 2084754 : bufHdr = GetLocalBufferDescriptor(bufid);
141 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
142 :
143 2084754 : *foundPtr = PinLocalBuffer(bufHdr, true);
144 : }
145 : else
146 : {
147 : uint32 buf_state;
148 :
149 7578 : victim_buffer = GetLocalVictimBuffer();
150 7578 : bufid = -victim_buffer - 1;
151 7578 : bufHdr = GetLocalBufferDescriptor(bufid);
152 :
153 : hresult = (LocalBufferLookupEnt *)
154 7578 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
155 7578 : if (found) /* shouldn't happen */
156 0 : elog(ERROR, "local buffer hash table corrupted");
157 7578 : hresult->id = bufid;
158 :
159 : /*
160 : * it's all ours now.
161 : */
162 7578 : bufHdr->tag = newTag;
163 :
164 7578 : buf_state = pg_atomic_read_u32(&bufHdr->state);
165 7578 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
166 7578 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
167 7578 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
168 :
169 7578 : *foundPtr = false;
170 : }
171 :
172 2092332 : return bufHdr;
173 : }
174 :
175 : static Buffer
176 29140 : GetLocalVictimBuffer(void)
177 : {
178 : int victim_bufid;
179 : int trycounter;
180 : uint32 buf_state;
181 : BufferDesc *bufHdr;
182 :
183 29140 : ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
184 :
185 : /*
186 : * Need to get a new buffer. We use a clock sweep algorithm (essentially
187 : * the same as what freelist.c does now...)
188 : */
189 29140 : trycounter = NLocBuffer;
190 : for (;;)
191 : {
192 35554 : victim_bufid = nextFreeLocalBufId;
193 :
194 35554 : if (++nextFreeLocalBufId >= NLocBuffer)
195 66 : nextFreeLocalBufId = 0;
196 :
197 35554 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
198 :
199 35554 : if (LocalRefCount[victim_bufid] == 0)
200 : {
201 35548 : buf_state = pg_atomic_read_u32(&bufHdr->state);
202 :
203 35548 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
204 : {
205 6408 : buf_state -= BUF_USAGECOUNT_ONE;
206 6408 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
207 6408 : trycounter = NLocBuffer;
208 : }
209 : else
210 : {
211 : /* Found a usable buffer */
212 29140 : PinLocalBuffer(bufHdr, false);
213 29140 : break;
214 : }
215 : }
216 6 : else if (--trycounter == 0)
217 0 : ereport(ERROR,
218 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
219 : errmsg("no empty local buffer available")));
220 : }
221 :
222 : /*
223 : * lazy memory allocation: allocate space on first use of a buffer.
224 : */
225 29140 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
226 : {
227 : /* Set pointer for use by BufferGetBlock() macro */
228 26650 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
229 : }
230 :
231 : /*
232 : * this buffer is not referenced but it might still be dirty. if that's
233 : * the case, write it out before reusing it!
234 : */
235 29140 : if (buf_state & BM_DIRTY)
236 : {
237 : instr_time io_start;
238 : SMgrRelation oreln;
239 894 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
240 :
241 : /* Find smgr relation for buffer */
242 894 : oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyBackendId);
243 :
244 894 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
245 :
246 894 : io_start = pgstat_prepare_io_time();
247 :
248 : /* And write... */
249 894 : smgrwrite(oreln,
250 894 : BufTagGetForkNum(&bufHdr->tag),
251 : bufHdr->tag.blockNum,
252 : localpage,
253 : false);
254 :
255 : /* Temporary table I/O does not use Buffer Access Strategies */
256 894 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
257 : IOOP_WRITE, io_start, 1);
258 :
259 : /* Mark not-dirty now in case we error out below */
260 894 : buf_state &= ~BM_DIRTY;
261 894 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
262 :
263 894 : pgBufferUsage.local_blks_written++;
264 : }
265 :
266 : /*
267 : * Remove the victim buffer from the hashtable and mark as invalid.
268 : */
269 29140 : if (buf_state & BM_TAG_VALID)
270 : {
271 : LocalBufferLookupEnt *hresult;
272 :
273 : hresult = (LocalBufferLookupEnt *)
274 900 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
275 900 : if (!hresult) /* shouldn't happen */
276 0 : elog(ERROR, "local buffer hash table corrupted");
277 : /* mark buffer invalid just in case hash insert fails */
278 900 : ClearBufferTag(&bufHdr->tag);
279 900 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
280 900 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
281 900 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT);
282 : }
283 :
284 29140 : return BufferDescriptorGetBuffer(bufHdr);
285 : }
286 :
287 : /* see LimitAdditionalPins() */
288 : static void
289 21030 : LimitAdditionalLocalPins(uint32 *additional_pins)
290 : {
291 : uint32 max_pins;
292 :
293 21030 : if (*additional_pins <= 1)
294 20860 : return;
295 :
296 : /*
297 : * In contrast to LimitAdditionalPins() other backends don't play a role
298 : * here. We can allow up to NLocBuffer pins in total.
299 : */
300 170 : max_pins = (NLocBuffer - NLocalPinnedBuffers);
301 :
302 170 : if (*additional_pins >= max_pins)
303 0 : *additional_pins = max_pins;
304 : }
305 :
306 : /*
307 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
308 : * temporary buffers.
309 : */
310 : BlockNumber
311 21030 : ExtendBufferedRelLocal(ExtendBufferedWhat eb,
312 : ForkNumber fork,
313 : uint32 flags,
314 : uint32 extend_by,
315 : BlockNumber extend_upto,
316 : Buffer *buffers,
317 : uint32 *extended_by)
318 : {
319 : BlockNumber first_block;
320 : instr_time io_start;
321 :
322 : /* Initialize local buffers if first request in this session */
323 21030 : if (LocalBufHash == NULL)
324 446 : InitLocalBuffers();
325 :
326 21030 : LimitAdditionalLocalPins(&extend_by);
327 :
328 42592 : for (uint32 i = 0; i < extend_by; i++)
329 : {
330 : BufferDesc *buf_hdr;
331 : Block buf_block;
332 :
333 21562 : buffers[i] = GetLocalVictimBuffer();
334 21562 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
335 21562 : buf_block = LocalBufHdrGetBlock(buf_hdr);
336 :
337 : /* new buffers are zero-filled */
338 21562 : MemSet((char *) buf_block, 0, BLCKSZ);
339 : }
340 :
341 21030 : first_block = smgrnblocks(eb.smgr, fork);
342 :
343 : if (extend_upto != InvalidBlockNumber)
344 : {
345 : /*
346 : * In contrast to shared relations, nothing could change the relation
347 : * size concurrently. Thus we shouldn't end up finding that we don't
348 : * need to do anything.
349 : */
350 : Assert(first_block <= extend_upto);
351 :
352 : Assert((uint64) first_block + extend_by <= extend_upto);
353 : }
354 :
355 : /* Fail if relation is already at maximum possible length */
356 21030 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
357 0 : ereport(ERROR,
358 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : errmsg("cannot extend relation %s beyond %u blocks",
360 : relpath(eb.smgr->smgr_rlocator, fork),
361 : MaxBlockNumber)));
362 :
363 42592 : for (int i = 0; i < extend_by; i++)
364 : {
365 : int victim_buf_id;
366 : BufferDesc *victim_buf_hdr;
367 : BufferTag tag;
368 : LocalBufferLookupEnt *hresult;
369 : bool found;
370 :
371 21562 : victim_buf_id = -buffers[i] - 1;
372 21562 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
373 :
374 21562 : InitBufferTag(&tag, &eb.smgr->smgr_rlocator.locator, fork, first_block + i);
375 :
376 : hresult = (LocalBufferLookupEnt *)
377 21562 : hash_search(LocalBufHash, (void *) &tag, HASH_ENTER, &found);
378 21562 : if (found)
379 : {
380 0 : BufferDesc *existing_hdr = GetLocalBufferDescriptor(hresult->id);
381 : uint32 buf_state;
382 :
383 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
384 :
385 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
386 0 : PinLocalBuffer(existing_hdr, false);
387 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
388 :
389 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
390 : Assert(buf_state & BM_TAG_VALID);
391 : Assert(!(buf_state & BM_DIRTY));
392 0 : buf_state &= BM_VALID;
393 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
394 : }
395 : else
396 : {
397 21562 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
398 :
399 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
400 :
401 21562 : victim_buf_hdr->tag = tag;
402 :
403 21562 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
404 :
405 21562 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
406 :
407 21562 : hresult->id = victim_buf_id;
408 : }
409 : }
410 :
411 21030 : io_start = pgstat_prepare_io_time();
412 :
413 : /* actually extend relation */
414 21030 : smgrzeroextend(eb.smgr, fork, first_block, extend_by, false);
415 :
416 21030 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
417 : io_start, extend_by);
418 :
419 42592 : for (int i = 0; i < extend_by; i++)
420 : {
421 21562 : Buffer buf = buffers[i];
422 : BufferDesc *buf_hdr;
423 : uint32 buf_state;
424 :
425 21562 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
426 :
427 21562 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
428 21562 : buf_state |= BM_VALID;
429 21562 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
430 : }
431 :
432 21030 : *extended_by = extend_by;
433 :
434 21030 : pgBufferUsage.temp_blks_written += extend_by;
435 :
436 21030 : return first_block;
437 : }
438 :
439 : /*
440 : * MarkLocalBufferDirty -
441 : * mark a local buffer dirty
442 : */
443 : void
444 3230604 : MarkLocalBufferDirty(Buffer buffer)
445 : {
446 : int bufid;
447 : BufferDesc *bufHdr;
448 : uint32 buf_state;
449 :
450 : Assert(BufferIsLocal(buffer));
451 :
452 : #ifdef LBDEBUG
453 : fprintf(stderr, "LB DIRTY %d\n", buffer);
454 : #endif
455 :
456 3230604 : bufid = -buffer - 1;
457 :
458 : Assert(LocalRefCount[bufid] > 0);
459 :
460 3230604 : bufHdr = GetLocalBufferDescriptor(bufid);
461 :
462 3230604 : buf_state = pg_atomic_read_u32(&bufHdr->state);
463 :
464 3230604 : if (!(buf_state & BM_DIRTY))
465 22082 : pgBufferUsage.local_blks_dirtied++;
466 :
467 3230604 : buf_state |= BM_DIRTY;
468 :
469 3230604 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
470 3230604 : }
471 :
472 : /*
473 : * DropRelationLocalBuffers
474 : * This function removes from the buffer pool all the pages of the
475 : * specified relation that have block numbers >= firstDelBlock.
476 : * (In particular, with firstDelBlock = 0, all pages are removed.)
477 : * Dirty pages are simply dropped, without bothering to write them
478 : * out first. Therefore, this is NOT rollback-able, and so should be
479 : * used only with extreme caution!
480 : *
481 : * See DropRelationBuffers in bufmgr.c for more notes.
482 : */
483 : void
484 692 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
485 : BlockNumber firstDelBlock)
486 : {
487 : int i;
488 :
489 645812 : for (i = 0; i < NLocBuffer; i++)
490 : {
491 645120 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
492 : LocalBufferLookupEnt *hresult;
493 : uint32 buf_state;
494 :
495 645120 : buf_state = pg_atomic_read_u32(&bufHdr->state);
496 :
497 698244 : if ((buf_state & BM_TAG_VALID) &&
498 54894 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
499 1770 : BufTagGetForkNum(&bufHdr->tag) == forkNum &&
500 1636 : bufHdr->tag.blockNum >= firstDelBlock)
501 : {
502 1584 : if (LocalRefCount[i] != 0)
503 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
504 : bufHdr->tag.blockNum,
505 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
506 : MyBackendId,
507 : BufTagGetForkNum(&bufHdr->tag)),
508 : LocalRefCount[i]);
509 :
510 : /* Remove entry from hashtable */
511 : hresult = (LocalBufferLookupEnt *)
512 1584 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
513 1584 : if (!hresult) /* shouldn't happen */
514 0 : elog(ERROR, "local buffer hash table corrupted");
515 : /* Mark buffer invalid */
516 1584 : ClearBufferTag(&bufHdr->tag);
517 1584 : buf_state &= ~BUF_FLAG_MASK;
518 1584 : buf_state &= ~BUF_USAGECOUNT_MASK;
519 1584 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
520 : }
521 : }
522 692 : }
523 :
524 : /*
525 : * DropRelationAllLocalBuffers
526 : * This function removes from the buffer pool all pages of all forks
527 : * of the specified relation.
528 : *
529 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
530 : */
531 : void
532 5612 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
533 : {
534 : int i;
535 :
536 5434364 : for (i = 0; i < NLocBuffer; i++)
537 : {
538 5428752 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
539 : LocalBufferLookupEnt *hresult;
540 : uint32 buf_state;
541 :
542 5428752 : buf_state = pg_atomic_read_u32(&bufHdr->state);
543 :
544 5771478 : if ((buf_state & BM_TAG_VALID) &&
545 342726 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
546 : {
547 26656 : if (LocalRefCount[i] != 0)
548 0 : elog(ERROR, "block %u of %s is still referenced (local %u)",
549 : bufHdr->tag.blockNum,
550 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
551 : MyBackendId,
552 : BufTagGetForkNum(&bufHdr->tag)),
553 : LocalRefCount[i]);
554 : /* Remove entry from hashtable */
555 : hresult = (LocalBufferLookupEnt *)
556 26656 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
557 26656 : if (!hresult) /* shouldn't happen */
558 0 : elog(ERROR, "local buffer hash table corrupted");
559 : /* Mark buffer invalid */
560 26656 : ClearBufferTag(&bufHdr->tag);
561 26656 : buf_state &= ~BUF_FLAG_MASK;
562 26656 : buf_state &= ~BUF_USAGECOUNT_MASK;
563 26656 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
564 : }
565 : }
566 5612 : }
567 :
568 : /*
569 : * InitLocalBuffers -
570 : * init the local buffer cache. Since most queries (esp. multi-user ones)
571 : * don't involve local buffers, we delay allocating actual memory for the
572 : * buffers until we need them; just make the buffer headers here.
573 : */
574 : static void
575 472 : InitLocalBuffers(void)
576 : {
577 472 : int nbufs = num_temp_buffers;
578 : HASHCTL info;
579 : int i;
580 :
581 : /*
582 : * Parallel workers can't access data in temporary tables, because they
583 : * have no visibility into the local buffers of their leader. This is a
584 : * convenient, low-cost place to provide a backstop check for that. Note
585 : * that we don't wish to prevent a parallel worker from accessing catalog
586 : * metadata about a temp table, so checks at higher levels would be
587 : * inappropriate.
588 : */
589 472 : if (IsParallelWorker())
590 0 : ereport(ERROR,
591 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
592 : errmsg("cannot access temporary tables during a parallel operation")));
593 :
594 : /* Allocate and zero buffer headers and auxiliary arrays */
595 472 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
596 472 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
597 472 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
598 472 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
599 0 : ereport(FATAL,
600 : (errcode(ERRCODE_OUT_OF_MEMORY),
601 : errmsg("out of memory")));
602 :
603 472 : nextFreeLocalBufId = 0;
604 :
605 : /* initialize fields that need to start off nonzero */
606 478256 : for (i = 0; i < nbufs; i++)
607 : {
608 477784 : BufferDesc *buf = GetLocalBufferDescriptor(i);
609 :
610 : /*
611 : * negative to indicate local buffer. This is tricky: shared buffers
612 : * start with 0. We have to start with -2. (Note that the routine
613 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
614 : * is -1.)
615 : */
616 477784 : buf->buf_id = -i - 2;
617 :
618 : /*
619 : * Intentionally do not initialize the buffer's atomic variable
620 : * (besides zeroing the underlying memory above). That way we get
621 : * errors on platforms without atomics, if somebody (re-)introduces
622 : * atomic operations for local buffers.
623 : */
624 : }
625 :
626 : /* Create the lookup hash table */
627 472 : info.keysize = sizeof(BufferTag);
628 472 : info.entrysize = sizeof(LocalBufferLookupEnt);
629 :
630 472 : LocalBufHash = hash_create("Local Buffer Lookup Table",
631 : nbufs,
632 : &info,
633 : HASH_ELEM | HASH_BLOBS);
634 :
635 472 : if (!LocalBufHash)
636 0 : elog(ERROR, "could not initialize local buffer hash table");
637 :
638 : /* Initialization done, mark buffers allocated */
639 472 : NLocBuffer = nbufs;
640 472 : }
641 :
642 : /*
643 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
644 : * that does not support adjusting the usagecount - but so far it does not
645 : * seem worth the trouble.
646 : */
647 : bool
648 2113894 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
649 : {
650 : uint32 buf_state;
651 2113894 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
652 2113894 : int bufid = -buffer - 1;
653 :
654 2113894 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
655 :
656 2113894 : if (LocalRefCount[bufid] == 0)
657 : {
658 1944918 : NLocalPinnedBuffers++;
659 1944918 : if (adjust_usagecount &&
660 1915778 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
661 : {
662 98322 : buf_state += BUF_USAGECOUNT_ONE;
663 98322 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
664 : }
665 : }
666 2113894 : LocalRefCount[bufid]++;
667 2113894 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
668 : BufferDescriptorGetBuffer(buf_hdr));
669 :
670 2113894 : return buf_state & BM_VALID;
671 : }
672 :
673 : void
674 2804774 : UnpinLocalBuffer(Buffer buffer)
675 : {
676 2804774 : int buffid = -buffer - 1;
677 :
678 : Assert(BufferIsLocal(buffer));
679 : Assert(LocalRefCount[buffid] > 0);
680 : Assert(NLocalPinnedBuffers > 0);
681 :
682 2804774 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
683 2804774 : if (--LocalRefCount[buffid] == 0)
684 1944918 : NLocalPinnedBuffers--;
685 2804774 : }
686 :
687 : /*
688 : * GUC check_hook for temp_buffers
689 : */
690 : bool
691 3706 : check_temp_buffers(int *newval, void **extra, GucSource source)
692 : {
693 : /*
694 : * Once local buffers have been initialized, it's too late to change this.
695 : * However, if this is only a test call, allow it.
696 : */
697 3706 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
698 : {
699 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
700 0 : return false;
701 : }
702 3706 : return true;
703 : }
704 :
705 : /*
706 : * GetLocalBufferStorage - allocate memory for a local buffer
707 : *
708 : * The idea of this function is to aggregate our requests for storage
709 : * so that the memory manager doesn't see a whole lot of relatively small
710 : * requests. Since we'll never give back a local buffer once it's created
711 : * within a particular process, no point in burdening memmgr with separately
712 : * managed chunks.
713 : */
714 : static Block
715 26650 : GetLocalBufferStorage(void)
716 : {
717 : static char *cur_block = NULL;
718 : static int next_buf_in_block = 0;
719 : static int num_bufs_in_block = 0;
720 : static int total_bufs_allocated = 0;
721 : static MemoryContext LocalBufferContext = NULL;
722 :
723 : char *this_buf;
724 :
725 : Assert(total_bufs_allocated < NLocBuffer);
726 :
727 26650 : if (next_buf_in_block >= num_bufs_in_block)
728 : {
729 : /* Need to make a new request to memmgr */
730 : int num_bufs;
731 :
732 : /*
733 : * We allocate local buffers in a context of their own, so that the
734 : * space eaten for them is easily recognizable in MemoryContextStats
735 : * output. Create the context on first use.
736 : */
737 738 : if (LocalBufferContext == NULL)
738 472 : LocalBufferContext =
739 472 : AllocSetContextCreate(TopMemoryContext,
740 : "LocalBufferContext",
741 : ALLOCSET_DEFAULT_SIZES);
742 :
743 : /* Start with a 16-buffer request; subsequent ones double each time */
744 738 : num_bufs = Max(num_bufs_in_block * 2, 16);
745 : /* But not more than what we need for all remaining local bufs */
746 738 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
747 : /* And don't overflow MaxAllocSize, either */
748 738 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
749 :
750 : /* Buffers should be I/O aligned. */
751 738 : cur_block = (char *)
752 738 : TYPEALIGN(PG_IO_ALIGN_SIZE,
753 : MemoryContextAlloc(LocalBufferContext,
754 : num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
755 738 : next_buf_in_block = 0;
756 738 : num_bufs_in_block = num_bufs;
757 : }
758 :
759 : /* Allocate next buffer in current memory block */
760 26650 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
761 26650 : next_buf_in_block++;
762 26650 : total_bufs_allocated++;
763 :
764 26650 : return (Block) this_buf;
765 : }
766 :
767 : /*
768 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
769 : *
770 : * This is just like CheckForBufferLeaks(), but for local buffers.
771 : */
772 : static void
773 1003790 : CheckForLocalBufferLeaks(void)
774 : {
775 : #ifdef USE_ASSERT_CHECKING
776 : if (LocalRefCount)
777 : {
778 : int RefCountErrors = 0;
779 : int i;
780 :
781 : for (i = 0; i < NLocBuffer; i++)
782 : {
783 : if (LocalRefCount[i] != 0)
784 : {
785 : Buffer b = -i - 1;
786 :
787 : PrintBufferLeakWarning(b);
788 : RefCountErrors++;
789 : }
790 : }
791 : Assert(RefCountErrors == 0);
792 : }
793 : #endif
794 1003790 : }
795 :
796 : /*
797 : * AtEOXact_LocalBuffers - clean up at end of transaction.
798 : *
799 : * This is just like AtEOXact_Buffers, but for local buffers.
800 : */
801 : void
802 976402 : AtEOXact_LocalBuffers(bool isCommit)
803 : {
804 976402 : CheckForLocalBufferLeaks();
805 976402 : }
806 :
807 : /*
808 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
809 : *
810 : * This is just like AtProcExit_Buffers, but for local buffers.
811 : */
812 : void
813 27388 : AtProcExit_LocalBuffers(void)
814 : {
815 : /*
816 : * We shouldn't be holding any remaining pins; if we are, and assertions
817 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
818 : * drop the temp rels.
819 : */
820 27388 : CheckForLocalBufferLeaks();
821 27388 : }
|