LCOV - code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 89.6 % 279 250
Test Date: 2026-03-10 15:14:48 Functions: 100.0 % 23 23
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * localbuf.c
       4              :  *    local buffer manager. Fast buffer manager for temporary tables,
       5              :  *    which never need to be WAL-logged or checkpointed, etc.
       6              :  *
       7              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       8              :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9              :  *
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *    src/backend/storage/buffer/localbuf.c
      13              :  *
      14              :  *-------------------------------------------------------------------------
      15              :  */
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/parallel.h"
      19              : #include "executor/instrument.h"
      20              : #include "pgstat.h"
      21              : #include "storage/aio.h"
      22              : #include "storage/buf_internals.h"
      23              : #include "storage/bufmgr.h"
      24              : #include "storage/fd.h"
      25              : #include "utils/guc_hooks.h"
      26              : #include "utils/memdebug.h"
      27              : #include "utils/memutils.h"
      28              : #include "utils/rel.h"
      29              : #include "utils/resowner.h"
      30              : 
      31              : 
      32              : /*#define LBDEBUG*/
      33              : 
      34              : /* entry for buffer lookup hashtable */
      35              : typedef struct
      36              : {
      37              :     BufferTag   key;            /* Tag of a disk page */
      38              :     int         id;             /* Associated local buffer's index */
      39              : } LocalBufferLookupEnt;
      40              : 
      41              : /* Note: this macro only works on local buffers, not shared ones! */
      42              : #define LocalBufHdrGetBlock(bufHdr) \
      43              :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      44              : 
      45              : int         NLocBuffer = 0;     /* until buffers are initialized */
      46              : 
      47              : BufferDesc *LocalBufferDescriptors = NULL;
      48              : Block      *LocalBufferBlockPointers = NULL;
      49              : int32      *LocalRefCount = NULL;
      50              : 
      51              : static int  nextFreeLocalBufId = 0;
      52              : 
      53              : static HTAB *LocalBufHash = NULL;
      54              : 
      55              : /* number of local buffers pinned at least once */
      56              : static int  NLocalPinnedBuffers = 0;
      57              : 
      58              : 
      59              : static void InitLocalBuffers(void);
      60              : static Block GetLocalBufferStorage(void);
      61              : static Buffer GetLocalVictimBuffer(void);
      62              : 
      63              : 
      64              : /*
      65              :  * PrefetchLocalBuffer -
      66              :  *    initiate asynchronous read of a block of a relation
      67              :  *
      68              :  * Do PrefetchBuffer's work for temporary relations.
      69              :  * No-op if prefetching isn't compiled in.
      70              :  */
      71              : PrefetchBufferResult
      72          783 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      73              :                     BlockNumber blockNum)
      74              : {
      75          783 :     PrefetchBufferResult result = {InvalidBuffer, false};
      76              :     BufferTag   newTag;         /* identity of requested block */
      77              :     LocalBufferLookupEnt *hresult;
      78              : 
      79          783 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      80              : 
      81              :     /* Initialize local buffers if first request in this session */
      82          783 :     if (LocalBufHash == NULL)
      83            0 :         InitLocalBuffers();
      84              : 
      85              :     /* See if the desired buffer already exists */
      86              :     hresult = (LocalBufferLookupEnt *)
      87          783 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      88              : 
      89          783 :     if (hresult)
      90              :     {
      91              :         /* Yes, so nothing to do */
      92          783 :         result.recent_buffer = -hresult->id - 1;
      93              :     }
      94              :     else
      95              :     {
      96              : #ifdef USE_PREFETCH
      97              :         /* Not in buffers, so initiate prefetch */
      98            0 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      99            0 :             smgrprefetch(smgr, forkNum, blockNum, 1))
     100              :         {
     101            0 :             result.initiated_io = true;
     102              :         }
     103              : #endif                          /* USE_PREFETCH */
     104              :     }
     105              : 
     106          783 :     return result;
     107              : }
     108              : 
     109              : 
     110              : /*
     111              :  * LocalBufferAlloc -
     112              :  *    Find or create a local buffer for the given page of the given relation.
     113              :  *
     114              :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
     115              :  * any locking since this is all local.  We support only default access
     116              :  * strategy (hence, usage_count is always advanced).
     117              :  */
     118              : BufferDesc *
     119      1280714 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     120              :                  bool *foundPtr)
     121              : {
     122              :     BufferTag   newTag;         /* identity of requested block */
     123              :     LocalBufferLookupEnt *hresult;
     124              :     BufferDesc *bufHdr;
     125              :     Buffer      victim_buffer;
     126              :     int         bufid;
     127              :     bool        found;
     128              : 
     129      1280714 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     130              : 
     131              :     /* Initialize local buffers if first request in this session */
     132      1280714 :     if (LocalBufHash == NULL)
     133           13 :         InitLocalBuffers();
     134              : 
     135      1280714 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     136              : 
     137              :     /* See if the desired buffer already exists */
     138              :     hresult = (LocalBufferLookupEnt *)
     139      1280714 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     140              : 
     141      1280714 :     if (hresult)
     142              :     {
     143      1272329 :         bufid = hresult->id;
     144      1272329 :         bufHdr = GetLocalBufferDescriptor(bufid);
     145              :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     146              : 
     147      1272329 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     148              :     }
     149              :     else
     150              :     {
     151              :         uint64      buf_state;
     152              : 
     153         8385 :         victim_buffer = GetLocalVictimBuffer();
     154         8379 :         bufid = -victim_buffer - 1;
     155         8379 :         bufHdr = GetLocalBufferDescriptor(bufid);
     156              : 
     157              :         hresult = (LocalBufferLookupEnt *)
     158         8379 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     159         8379 :         if (found)              /* shouldn't happen */
     160            0 :             elog(ERROR, "local buffer hash table corrupted");
     161         8379 :         hresult->id = bufid;
     162              : 
     163              :         /*
     164              :          * it's all ours now.
     165              :          */
     166         8379 :         bufHdr->tag = newTag;
     167              : 
     168         8379 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     169         8379 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     170         8379 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     171         8379 :         pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     172              : 
     173         8379 :         *foundPtr = false;
     174              :     }
     175              : 
     176      1280708 :     return bufHdr;
     177              : }
     178              : 
     179              : /*
     180              :  * Like FlushBuffer(), just for local buffers.
     181              :  */
     182              : void
     183         3634 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
     184              : {
     185              :     instr_time  io_start;
     186         3634 :     Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     187              : 
     188              :     Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
     189              : 
     190              :     /*
     191              :      * Try to start an I/O operation.  There currently are no reasons for
     192              :      * StartLocalBufferIO to return false, so we raise an error in that case.
     193              :      */
     194         3634 :     if (!StartLocalBufferIO(bufHdr, false, false))
     195            0 :         elog(ERROR, "failed to start write IO on local buffer");
     196              : 
     197              :     /* Find smgr relation for buffer */
     198         3634 :     if (reln == NULL)
     199         3334 :         reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
     200              :                         MyProcNumber);
     201              : 
     202         3634 :     PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
     203              : 
     204         3634 :     io_start = pgstat_prepare_io_time(track_io_timing);
     205              : 
     206              :     /* And write... */
     207         3634 :     smgrwrite(reln,
     208         3634 :               BufTagGetForkNum(&bufHdr->tag),
     209              :               bufHdr->tag.blockNum,
     210              :               localpage,
     211              :               false);
     212              : 
     213              :     /* Temporary table I/O does not use Buffer Access Strategies */
     214         3634 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     215              :                             IOOP_WRITE, io_start, 1, BLCKSZ);
     216              : 
     217              :     /* Mark not-dirty */
     218         3634 :     TerminateLocalBufferIO(bufHdr, true, 0, false);
     219              : 
     220         3634 :     pgBufferUsage.local_blks_written++;
     221         3634 : }
     222              : 
     223              : static Buffer
     224        23175 : GetLocalVictimBuffer(void)
     225              : {
     226              :     int         victim_bufid;
     227              :     int         trycounter;
     228              :     BufferDesc *bufHdr;
     229              : 
     230        23175 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     231              : 
     232              :     /*
     233              :      * Need to get a new buffer.  We use a clock-sweep algorithm (essentially
     234              :      * the same as what freelist.c does now...)
     235              :      */
     236        23175 :     trycounter = NLocBuffer;
     237              :     for (;;)
     238              :     {
     239       103212 :         victim_bufid = nextFreeLocalBufId;
     240              : 
     241       103212 :         if (++nextFreeLocalBufId >= NLocBuffer)
     242          867 :             nextFreeLocalBufId = 0;
     243              : 
     244       103212 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     245              : 
     246       103212 :         if (LocalRefCount[victim_bufid] == 0)
     247              :         {
     248        42546 :             uint64      buf_state = pg_atomic_read_u64(&bufHdr->state);
     249              : 
     250        42546 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     251              :             {
     252        19377 :                 buf_state -= BUF_USAGECOUNT_ONE;
     253        19377 :                 pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     254        19377 :                 trycounter = NLocBuffer;
     255              :             }
     256        23169 :             else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
     257              :             {
     258              :                 /*
     259              :                  * This can be reached if the backend initiated AIO for this
     260              :                  * buffer and then errored out.
     261              :                  */
     262              :             }
     263              :             else
     264              :             {
     265              :                 /* Found a usable buffer */
     266        23169 :                 PinLocalBuffer(bufHdr, false);
     267        23169 :                 break;
     268              :             }
     269              :         }
     270        60666 :         else if (--trycounter == 0)
     271            6 :             ereport(ERROR,
     272              :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     273              :                      errmsg("no empty local buffer available")));
     274              :     }
     275              : 
     276              :     /*
     277              :      * lazy memory allocation: allocate space on first use of a buffer.
     278              :      */
     279        23169 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     280              :     {
     281              :         /* Set pointer for use by BufferGetBlock() macro */
     282        15765 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     283              :     }
     284              : 
     285              :     /*
     286              :      * this buffer is not referenced but it might still be dirty. if that's
     287              :      * the case, write it out before reusing it!
     288              :      */
     289        23169 :     if (pg_atomic_read_u64(&bufHdr->state) & BM_DIRTY)
     290         3312 :         FlushLocalBuffer(bufHdr, NULL);
     291              : 
     292              :     /*
     293              :      * Remove the victim buffer from the hashtable and mark as invalid.
     294              :      */
     295        23169 :     if (pg_atomic_read_u64(&bufHdr->state) & BM_TAG_VALID)
     296              :     {
     297         6409 :         InvalidateLocalBuffer(bufHdr, false);
     298              : 
     299         6409 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
     300              :     }
     301              : 
     302        23169 :     return BufferDescriptorGetBuffer(bufHdr);
     303              : }
     304              : 
     305              : /* see GetPinLimit() */
     306              : uint32
     307         6942 : GetLocalPinLimit(void)
     308              : {
     309              :     /* Every backend has its own temporary buffers, and can pin them all. */
     310         6942 :     return num_temp_buffers;
     311              : }
     312              : 
     313              : /* see GetAdditionalPinLimit() */
     314              : uint32
     315        24022 : GetAdditionalLocalPinLimit(void)
     316              : {
     317              :     Assert(NLocalPinnedBuffers <= num_temp_buffers);
     318        24022 :     return num_temp_buffers - NLocalPinnedBuffers;
     319              : }
     320              : 
     321              : /* see LimitAdditionalPins() */
     322              : void
     323        11498 : LimitAdditionalLocalPins(uint32 *additional_pins)
     324              : {
     325              :     uint32      max_pins;
     326              : 
     327        11498 :     if (*additional_pins <= 1)
     328        11170 :         return;
     329              : 
     330              :     /*
     331              :      * In contrast to LimitAdditionalPins() other backends don't play a role
     332              :      * here. We can allow up to NLocBuffer pins in total, but it might not be
     333              :      * initialized yet so read num_temp_buffers.
     334              :      */
     335          328 :     max_pins = (num_temp_buffers - NLocalPinnedBuffers);
     336              : 
     337          328 :     if (*additional_pins >= max_pins)
     338            0 :         *additional_pins = max_pins;
     339              : }
     340              : 
     341              : /*
     342              :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     343              :  * temporary buffers.
     344              :  */
     345              : BlockNumber
     346        11498 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
     347              :                        ForkNumber fork,
     348              :                        uint32 flags,
     349              :                        uint32 extend_by,
     350              :                        BlockNumber extend_upto,
     351              :                        Buffer *buffers,
     352              :                        uint32 *extended_by)
     353              : {
     354              :     BlockNumber first_block;
     355              :     instr_time  io_start;
     356              : 
     357              :     /* Initialize local buffers if first request in this session */
     358        11498 :     if (LocalBufHash == NULL)
     359          255 :         InitLocalBuffers();
     360              : 
     361        11498 :     LimitAdditionalLocalPins(&extend_by);
     362              : 
     363        26288 :     for (uint32 i = 0; i < extend_by; i++)
     364              :     {
     365              :         BufferDesc *buf_hdr;
     366              :         Block       buf_block;
     367              : 
     368        14790 :         buffers[i] = GetLocalVictimBuffer();
     369        14790 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     370        14790 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     371              : 
     372              :         /* new buffers are zero-filled */
     373        14790 :         MemSet(buf_block, 0, BLCKSZ);
     374              :     }
     375              : 
     376        11498 :     first_block = smgrnblocks(BMR_GET_SMGR(bmr), fork);
     377              : 
     378              :     if (extend_upto != InvalidBlockNumber)
     379              :     {
     380              :         /*
     381              :          * In contrast to shared relations, nothing could change the relation
     382              :          * size concurrently. Thus we shouldn't end up finding that we don't
     383              :          * need to do anything.
     384              :          */
     385              :         Assert(first_block <= extend_upto);
     386              : 
     387              :         Assert((uint64) first_block + extend_by <= extend_upto);
     388              :     }
     389              : 
     390              :     /* Fail if relation is already at maximum possible length */
     391        11498 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     392            0 :         ereport(ERROR,
     393              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     394              :                  errmsg("cannot extend relation %s beyond %u blocks",
     395              :                         relpath(BMR_GET_SMGR(bmr)->smgr_rlocator, fork).str,
     396              :                         MaxBlockNumber)));
     397              : 
     398        26288 :     for (uint32 i = 0; i < extend_by; i++)
     399              :     {
     400              :         int         victim_buf_id;
     401              :         BufferDesc *victim_buf_hdr;
     402              :         BufferTag   tag;
     403              :         LocalBufferLookupEnt *hresult;
     404              :         bool        found;
     405              : 
     406        14790 :         victim_buf_id = -buffers[i] - 1;
     407        14790 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     408              : 
     409              :         /* in case we need to pin an existing buffer below */
     410        14790 :         ResourceOwnerEnlarge(CurrentResourceOwner);
     411              : 
     412        14790 :         InitBufferTag(&tag, &BMR_GET_SMGR(bmr)->smgr_rlocator.locator, fork,
     413              :                       first_block + i);
     414              : 
     415              :         hresult = (LocalBufferLookupEnt *)
     416        14790 :             hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
     417        14790 :         if (found)
     418              :         {
     419              :             BufferDesc *existing_hdr;
     420              :             uint64      buf_state;
     421              : 
     422            0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     423              : 
     424            0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     425            0 :             PinLocalBuffer(existing_hdr, false);
     426            0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     427              : 
     428              :             /*
     429              :              * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
     430              :              */
     431            0 :             buf_state = pg_atomic_read_u64(&existing_hdr->state);
     432              :             Assert(buf_state & BM_TAG_VALID);
     433              :             Assert(!(buf_state & BM_DIRTY));
     434            0 :             buf_state &= ~BM_VALID;
     435            0 :             pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
     436              : 
     437              :             /* no need to loop for local buffers */
     438            0 :             StartLocalBufferIO(existing_hdr, true, false);
     439              :         }
     440              :         else
     441              :         {
     442        14790 :             uint64      buf_state = pg_atomic_read_u64(&victim_buf_hdr->state);
     443              : 
     444              :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
     445              : 
     446        14790 :             victim_buf_hdr->tag = tag;
     447              : 
     448        14790 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     449              : 
     450        14790 :             pg_atomic_unlocked_write_u64(&victim_buf_hdr->state, buf_state);
     451              : 
     452        14790 :             hresult->id = victim_buf_id;
     453              : 
     454        14790 :             StartLocalBufferIO(victim_buf_hdr, true, false);
     455              :         }
     456              :     }
     457              : 
     458        11498 :     io_start = pgstat_prepare_io_time(track_io_timing);
     459              : 
     460              :     /* actually extend relation */
     461        11498 :     smgrzeroextend(BMR_GET_SMGR(bmr), fork, first_block, extend_by, false);
     462              : 
     463        11498 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     464        11498 :                             io_start, 1, extend_by * BLCKSZ);
     465              : 
     466        26288 :     for (uint32 i = 0; i < extend_by; i++)
     467              :     {
     468        14790 :         Buffer      buf = buffers[i];
     469              :         BufferDesc *buf_hdr;
     470              :         uint64      buf_state;
     471              : 
     472        14790 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     473              : 
     474        14790 :         buf_state = pg_atomic_read_u64(&buf_hdr->state);
     475        14790 :         buf_state |= BM_VALID;
     476        14790 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     477              :     }
     478              : 
     479        11498 :     *extended_by = extend_by;
     480              : 
     481        11498 :     pgBufferUsage.local_blks_written += extend_by;
     482              : 
     483        11498 :     return first_block;
     484              : }
     485              : 
     486              : /*
     487              :  * MarkLocalBufferDirty -
     488              :  *    mark a local buffer dirty
     489              :  */
     490              : void
     491      1863976 : MarkLocalBufferDirty(Buffer buffer)
     492              : {
     493              :     int         bufid;
     494              :     BufferDesc *bufHdr;
     495              :     uint64      buf_state;
     496              : 
     497              :     Assert(BufferIsLocal(buffer));
     498              : 
     499              : #ifdef LBDEBUG
     500              :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     501              : #endif
     502              : 
     503      1863976 :     bufid = -buffer - 1;
     504              : 
     505              :     Assert(LocalRefCount[bufid] > 0);
     506              : 
     507      1863976 :     bufHdr = GetLocalBufferDescriptor(bufid);
     508              : 
     509      1863976 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     510              : 
     511      1863976 :     if (!(buf_state & BM_DIRTY))
     512        14957 :         pgBufferUsage.local_blks_dirtied++;
     513              : 
     514      1863976 :     buf_state |= BM_DIRTY;
     515              : 
     516      1863976 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     517      1863976 : }
     518              : 
     519              : /*
     520              :  * Like StartBufferIO, but for local buffers
     521              :  */
     522              : bool
     523        26865 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
     524              : {
     525              :     uint64      buf_state;
     526              : 
     527              :     /*
     528              :      * With AIO the buffer could have IO in progress, e.g. when there are two
     529              :      * scans of the same relation. Either wait for the other IO or return
     530              :      * false.
     531              :      */
     532        26865 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     533              :     {
     534            0 :         PgAioWaitRef iow = bufHdr->io_wref;
     535              : 
     536            0 :         if (nowait)
     537            0 :             return false;
     538              : 
     539            0 :         pgaio_wref_wait(&iow);
     540              :     }
     541              : 
     542              :     /* Once we get here, there is definitely no I/O active on this buffer */
     543              : 
     544              :     /* Check if someone else already did the I/O */
     545        26865 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     546        26865 :     if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
     547              :     {
     548            2 :         return false;
     549              :     }
     550              : 
     551              :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     552              : 
     553              :     /* local buffers don't track IO using resowners */
     554              : 
     555        26863 :     return true;
     556              : }
     557              : 
     558              : /*
     559              :  * Like TerminateBufferIO, but for local buffers
     560              :  */
     561              : void
     562        12071 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint64 set_flag_bits,
     563              :                        bool release_aio)
     564              : {
     565              :     /* Only need to adjust flags */
     566        12071 :     uint64      buf_state = pg_atomic_read_u64(&bufHdr->state);
     567              : 
     568              :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     569              : 
     570              :     /* Clear earlier errors, if this IO failed, it'll be marked again */
     571        12071 :     buf_state &= ~BM_IO_ERROR;
     572              : 
     573        12071 :     if (clear_dirty)
     574         3634 :         buf_state &= ~BM_DIRTY;
     575              : 
     576        12071 :     if (release_aio)
     577              :     {
     578              :         /* release pin held by IO subsystem, see also buffer_stage_common() */
     579              :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     580         8409 :         buf_state -= BUF_REFCOUNT_ONE;
     581         8409 :         pgaio_wref_clear(&bufHdr->io_wref);
     582              :     }
     583              : 
     584        12071 :     buf_state |= set_flag_bits;
     585        12071 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     586              : 
     587              :     /* local buffers don't track IO using resowners */
     588              : 
     589              :     /* local buffers don't use the IO CV, as no other process can see buffer */
     590              : 
     591              :     /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
     592        12071 : }
     593              : 
     594              : /*
     595              :  * InvalidateLocalBuffer -- mark a local buffer invalid.
     596              :  *
     597              :  * If check_unreferenced is true, error out if the buffer is still
     598              :  * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
     599              :  * as part of changing the identity of a buffer, instead of just dropping the
     600              :  * buffer.
     601              :  *
     602              :  * See also InvalidateBuffer().
     603              :  */
     604              : void
     605        23169 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
     606              : {
     607        23169 :     Buffer      buffer = BufferDescriptorGetBuffer(bufHdr);
     608        23169 :     int         bufid = -buffer - 1;
     609              :     uint64      buf_state;
     610              :     LocalBufferLookupEnt *hresult;
     611              : 
     612              :     /*
     613              :      * It's possible that we started IO on this buffer before e.g. aborting
     614              :      * the transaction that created a table. We need to wait for that IO to
     615              :      * complete before removing / reusing the buffer.
     616              :      */
     617        23169 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     618              :     {
     619            0 :         PgAioWaitRef iow = bufHdr->io_wref;
     620              : 
     621            0 :         pgaio_wref_wait(&iow);
     622              :         Assert(!pgaio_wref_valid(&bufHdr->io_wref));
     623              :     }
     624              : 
     625        23169 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     626              : 
     627              :     /*
     628              :      * We need to test not just LocalRefCount[bufid] but also the BufferDesc
     629              :      * itself, as the latter is used to represent a pin by the AIO subsystem.
     630              :      * This can happen if AIO is initiated and then the query errors out.
     631              :      */
     632        23169 :     if (check_unreferenced &&
     633        16760 :         (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
     634            0 :         elog(ERROR, "block %u of %s is still referenced (local %d)",
     635              :              bufHdr->tag.blockNum,
     636              :              relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     637              :                             MyProcNumber,
     638              :                             BufTagGetForkNum(&bufHdr->tag)).str,
     639              :              LocalRefCount[bufid]);
     640              : 
     641              :     /* Remove entry from hashtable */
     642              :     hresult = (LocalBufferLookupEnt *)
     643        23169 :         hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     644        23169 :     if (!hresult)               /* shouldn't happen */
     645            0 :         elog(ERROR, "local buffer hash table corrupted");
     646              :     /* Mark buffer invalid */
     647        23169 :     ClearBufferTag(&bufHdr->tag);
     648        23169 :     buf_state &= ~BUF_FLAG_MASK;
     649        23169 :     buf_state &= ~BUF_USAGECOUNT_MASK;
     650        23169 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     651        23169 : }
     652              : 
     653              : /*
     654              :  * DropRelationLocalBuffers
     655              :  *      This function removes from the buffer pool all the pages of the
     656              :  *      specified relation that have block numbers >= firstDelBlock.
     657              :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     658              :  *      Dirty pages are simply dropped, without bothering to write them
     659              :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     660              :  *      used only with extreme caution!
     661              :  *
     662              :  *      See DropRelationBuffers in bufmgr.c for more notes.
     663              :  */
     664              : void
     665          375 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
     666              :                          int nforks, BlockNumber *firstDelBlock)
     667              : {
     668              :     int         i;
     669              :     int         j;
     670              : 
     671       309623 :     for (i = 0; i < NLocBuffer; i++)
     672              :     {
     673       309248 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     674              :         uint64      buf_state;
     675              : 
     676       309248 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     677              : 
     678       309248 :         if (!(buf_state & BM_TAG_VALID) ||
     679        28308 :             !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     680       308407 :             continue;
     681              : 
     682          965 :         for (j = 0; j < nforks; j++)
     683              :         {
     684          926 :             if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
     685          834 :                 bufHdr->tag.blockNum >= firstDelBlock[j])
     686              :             {
     687          802 :                 InvalidateLocalBuffer(bufHdr, true);
     688          802 :                 break;
     689              :             }
     690              :         }
     691              :     }
     692          375 : }
     693              : 
     694              : /*
     695              :  * DropRelationAllLocalBuffers
     696              :  *      This function removes from the buffer pool all pages of all forks
     697              :  *      of the specified relation.
     698              :  *
     699              :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     700              :  */
     701              : void
     702         3275 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     703              : {
     704              :     int         i;
     705              : 
     706      3076355 :     for (i = 0; i < NLocBuffer; i++)
     707              :     {
     708      3073080 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     709              :         uint64      buf_state;
     710              : 
     711      3073080 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     712              : 
     713      3308272 :         if ((buf_state & BM_TAG_VALID) &&
     714       235192 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     715              :         {
     716        15908 :             InvalidateLocalBuffer(bufHdr, true);
     717              :         }
     718              :     }
     719         3275 : }
     720              : 
     721              : /*
     722              :  * InitLocalBuffers -
     723              :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     724              :  *    don't involve local buffers, we delay allocating actual memory for the
     725              :  *    buffers until we need them; just make the buffer headers here.
     726              :  */
     727              : static void
     728          268 : InitLocalBuffers(void)
     729              : {
     730          268 :     int         nbufs = num_temp_buffers;
     731              :     HASHCTL     info;
     732              :     int         i;
     733              : 
     734              :     /*
     735              :      * Parallel workers can't access data in temporary tables, because they
     736              :      * have no visibility into the local buffers of their leader.  This is a
     737              :      * convenient, low-cost place to provide a backstop check for that.  Note
     738              :      * that we don't wish to prevent a parallel worker from accessing catalog
     739              :      * metadata about a temp table, so checks at higher levels would be
     740              :      * inappropriate.
     741              :      */
     742          268 :     if (IsParallelWorker())
     743            0 :         ereport(ERROR,
     744              :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     745              :                  errmsg("cannot access temporary tables during a parallel operation")));
     746              : 
     747              :     /* Allocate and zero buffer headers and auxiliary arrays */
     748          268 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     749          268 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     750          268 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     751          268 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     752            0 :         ereport(FATAL,
     753              :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     754              :                  errmsg("out of memory")));
     755              : 
     756          268 :     nextFreeLocalBufId = 0;
     757              : 
     758              :     /* initialize fields that need to start off nonzero */
     759       259916 :     for (i = 0; i < nbufs; i++)
     760              :     {
     761       259648 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     762              : 
     763              :         /*
     764              :          * negative to indicate local buffer. This is tricky: shared buffers
     765              :          * start with 0. We have to start with -2. (Note that the routine
     766              :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     767              :          * is -1.)
     768              :          */
     769       259648 :         buf->buf_id = -i - 2;
     770              : 
     771       259648 :         pgaio_wref_clear(&buf->io_wref);
     772              : 
     773              :         /*
     774              :          * Intentionally do not initialize the buffer's atomic variable
     775              :          * (besides zeroing the underlying memory above). That way we get
     776              :          * errors on platforms without atomics, if somebody (re-)introduces
     777              :          * atomic operations for local buffers.
     778              :          */
     779              :     }
     780              : 
     781              :     /* Create the lookup hash table */
     782          268 :     info.keysize = sizeof(BufferTag);
     783          268 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     784              : 
     785          268 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     786              :                                nbufs,
     787              :                                &info,
     788              :                                HASH_ELEM | HASH_BLOBS);
     789              : 
     790          268 :     if (!LocalBufHash)
     791            0 :         elog(ERROR, "could not initialize local buffer hash table");
     792              : 
     793              :     /* Initialization done, mark buffers allocated */
     794          268 :     NLocBuffer = nbufs;
     795          268 : }
     796              : 
     797              : /*
     798              :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     799              :  * that does not support adjusting the usagecount - but so far it does not
     800              :  * seem worth the trouble.
     801              :  *
     802              :  * Note that ResourceOwnerEnlarge() must have been done already.
     803              :  */
     804              : bool
     805      1295830 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     806              : {
     807              :     uint64      buf_state;
     808      1295830 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     809      1295830 :     int         bufid = -buffer - 1;
     810              : 
     811      1295830 :     buf_state = pg_atomic_read_u64(&buf_hdr->state);
     812              : 
     813      1295830 :     if (LocalRefCount[bufid] == 0)
     814              :     {
     815      1207753 :         NLocalPinnedBuffers++;
     816      1207753 :         buf_state += BUF_REFCOUNT_ONE;
     817      1207753 :         if (adjust_usagecount &&
     818      1184284 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     819              :         {
     820        64832 :             buf_state += BUF_USAGECOUNT_ONE;
     821              :         }
     822      1207753 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     823              : 
     824              :         /*
     825              :          * See comment in PinBuffer().
     826              :          *
     827              :          * If the buffer isn't allocated yet, it'll be marked as defined in
     828              :          * GetLocalBufferStorage().
     829              :          */
     830      1207753 :         if (LocalBufHdrGetBlock(buf_hdr) != NULL)
     831              :             VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     832              :     }
     833      1295830 :     LocalRefCount[bufid]++;
     834      1295830 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     835              :                                 BufferDescriptorGetBuffer(buf_hdr));
     836              : 
     837      1295830 :     return buf_state & BM_VALID;
     838              : }
     839              : 
     840              : void
     841      1647629 : UnpinLocalBuffer(Buffer buffer)
     842              : {
     843      1647629 :     UnpinLocalBufferNoOwner(buffer);
     844      1647629 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     845      1647629 : }
     846              : 
     847              : void
     848      1650662 : UnpinLocalBufferNoOwner(Buffer buffer)
     849              : {
     850      1650662 :     int         buffid = -buffer - 1;
     851              : 
     852              :     Assert(BufferIsLocal(buffer));
     853              :     Assert(LocalRefCount[buffid] > 0);
     854              :     Assert(NLocalPinnedBuffers > 0);
     855              : 
     856      1650662 :     if (--LocalRefCount[buffid] == 0)
     857              :     {
     858      1207753 :         BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
     859              :         uint64      buf_state;
     860              : 
     861      1207753 :         NLocalPinnedBuffers--;
     862              : 
     863      1207753 :         buf_state = pg_atomic_read_u64(&buf_hdr->state);
     864              :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     865      1207753 :         buf_state -= BUF_REFCOUNT_ONE;
     866      1207753 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     867              : 
     868              :         /* see comment in UnpinBufferNoOwner */
     869              :         VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     870              :     }
     871      1650662 : }
     872              : 
     873              : /*
     874              :  * GUC check_hook for temp_buffers
     875              :  */
     876              : bool
     877         1201 : check_temp_buffers(int *newval, void **extra, GucSource source)
     878              : {
     879              :     /*
     880              :      * Once local buffers have been initialized, it's too late to change this.
     881              :      * However, if this is only a test call, allow it.
     882              :      */
     883         1201 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     884              :     {
     885            0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     886            0 :         return false;
     887              :     }
     888         1201 :     return true;
     889              : }
     890              : 
     891              : /*
     892              :  * GetLocalBufferStorage - allocate memory for a local buffer
     893              :  *
     894              :  * The idea of this function is to aggregate our requests for storage
     895              :  * so that the memory manager doesn't see a whole lot of relatively small
     896              :  * requests.  Since we'll never give back a local buffer once it's created
     897              :  * within a particular process, no point in burdening memmgr with separately
     898              :  * managed chunks.
     899              :  */
     900              : static Block
     901        15765 : GetLocalBufferStorage(void)
     902              : {
     903              :     static char *cur_block = NULL;
     904              :     static int  next_buf_in_block = 0;
     905              :     static int  num_bufs_in_block = 0;
     906              :     static int  total_bufs_allocated = 0;
     907              :     static MemoryContext LocalBufferContext = NULL;
     908              : 
     909              :     char       *this_buf;
     910              : 
     911              :     Assert(total_bufs_allocated < NLocBuffer);
     912              : 
     913        15765 :     if (next_buf_in_block >= num_bufs_in_block)
     914              :     {
     915              :         /* Need to make a new request to memmgr */
     916              :         int         num_bufs;
     917              : 
     918              :         /*
     919              :          * We allocate local buffers in a context of their own, so that the
     920              :          * space eaten for them is easily recognizable in MemoryContextStats
     921              :          * output.  Create the context on first use.
     922              :          */
     923          430 :         if (LocalBufferContext == NULL)
     924          268 :             LocalBufferContext =
     925          268 :                 AllocSetContextCreate(TopMemoryContext,
     926              :                                       "LocalBufferContext",
     927              :                                       ALLOCSET_DEFAULT_SIZES);
     928              : 
     929              :         /* Start with a 16-buffer request; subsequent ones double each time */
     930          430 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     931              :         /* But not more than what we need for all remaining local bufs */
     932          430 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     933              :         /* And don't overflow MaxAllocSize, either */
     934          430 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     935              : 
     936              :         /* Buffers should be I/O aligned. */
     937          860 :         cur_block = MemoryContextAllocAligned(LocalBufferContext,
     938          430 :                                               num_bufs * BLCKSZ,
     939              :                                               PG_IO_ALIGN_SIZE,
     940              :                                               0);
     941              : 
     942          430 :         next_buf_in_block = 0;
     943          430 :         num_bufs_in_block = num_bufs;
     944              :     }
     945              : 
     946              :     /* Allocate next buffer in current memory block */
     947        15765 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     948        15765 :     next_buf_in_block++;
     949        15765 :     total_bufs_allocated++;
     950              : 
     951              :     /*
     952              :      * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
     953              :      * here.  The block is actually undefined, but we want consistency with
     954              :      * the regular case of not needing to allocate memory.  This is
     955              :      * specifically needed when method_io_uring.c fills the block, because
     956              :      * Valgrind doesn't recognize io_uring reads causing undefined memory to
     957              :      * become defined.
     958              :      */
     959              :     VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
     960              : 
     961        15765 :     return (Block) this_buf;
     962              : }
     963              : 
     964              : /*
     965              :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     966              :  *
     967              :  * This is just like CheckForBufferLeaks(), but for local buffers.
     968              :  */
     969              : static void
     970       588002 : CheckForLocalBufferLeaks(void)
     971              : {
     972              : #ifdef USE_ASSERT_CHECKING
     973              :     if (LocalRefCount)
     974              :     {
     975              :         int         RefCountErrors = 0;
     976              :         int         i;
     977              : 
     978              :         for (i = 0; i < NLocBuffer; i++)
     979              :         {
     980              :             if (LocalRefCount[i] != 0)
     981              :             {
     982              :                 Buffer      b = -i - 1;
     983              :                 char       *s;
     984              : 
     985              :                 s = DebugPrintBufferRefcount(b);
     986              :                 elog(WARNING, "local buffer refcount leak: %s", s);
     987              :                 pfree(s);
     988              : 
     989              :                 RefCountErrors++;
     990              :             }
     991              :         }
     992              :         Assert(RefCountErrors == 0);
     993              :     }
     994              : #endif
     995       588002 : }
     996              : 
     997              : /*
     998              :  * AtEOXact_LocalBuffers - clean up at end of transaction.
     999              :  *
    1000              :  * This is just like AtEOXact_Buffers, but for local buffers.
    1001              :  */
    1002              : void
    1003       564469 : AtEOXact_LocalBuffers(bool isCommit)
    1004              : {
    1005       564469 :     CheckForLocalBufferLeaks();
    1006       564469 : }
    1007              : 
    1008              : /*
    1009              :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
    1010              :  *
    1011              :  * This is just like AtProcExit_Buffers, but for local buffers.
    1012              :  */
    1013              : void
    1014        23533 : AtProcExit_LocalBuffers(void)
    1015              : {
    1016              :     /*
    1017              :      * We shouldn't be holding any remaining pins; if we are, and assertions
    1018              :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
    1019              :      * drop the temp rels.
    1020              :      */
    1021        23533 :     CheckForLocalBufferLeaks();
    1022        23533 : }
        

Generated by: LCOV version 2.0-1