LCOV - code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 250 279 89.6 %
Date: 2025-11-07 21:17:33 Functions: 23 23 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * localbuf.c
       4             :  *    local buffer manager. Fast buffer manager for temporary tables,
       5             :  *    which never need to be WAL-logged or checkpointed, etc.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9             :  *
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/storage/buffer/localbuf.c
      13             :  *
      14             :  *-------------------------------------------------------------------------
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/parallel.h"
      19             : #include "executor/instrument.h"
      20             : #include "pgstat.h"
      21             : #include "storage/aio.h"
      22             : #include "storage/buf_internals.h"
      23             : #include "storage/bufmgr.h"
      24             : #include "storage/fd.h"
      25             : #include "utils/guc_hooks.h"
      26             : #include "utils/memdebug.h"
      27             : #include "utils/memutils.h"
      28             : #include "utils/rel.h"
      29             : #include "utils/resowner.h"
      30             : 
      31             : 
      32             : /*#define LBDEBUG*/
      33             : 
      34             : /* entry for buffer lookup hashtable */
      35             : typedef struct
      36             : {
      37             :     BufferTag   key;            /* Tag of a disk page */
      38             :     int         id;             /* Associated local buffer's index */
      39             : } LocalBufferLookupEnt;
      40             : 
      41             : /* Note: this macro only works on local buffers, not shared ones! */
      42             : #define LocalBufHdrGetBlock(bufHdr) \
      43             :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      44             : 
      45             : int         NLocBuffer = 0;     /* until buffers are initialized */
      46             : 
      47             : BufferDesc *LocalBufferDescriptors = NULL;
      48             : Block      *LocalBufferBlockPointers = NULL;
      49             : int32      *LocalRefCount = NULL;
      50             : 
      51             : static int  nextFreeLocalBufId = 0;
      52             : 
      53             : static HTAB *LocalBufHash = NULL;
      54             : 
      55             : /* number of local buffers pinned at least once */
      56             : static int  NLocalPinnedBuffers = 0;
      57             : 
      58             : 
      59             : static void InitLocalBuffers(void);
      60             : static Block GetLocalBufferStorage(void);
      61             : static Buffer GetLocalVictimBuffer(void);
      62             : 
      63             : 
      64             : /*
      65             :  * PrefetchLocalBuffer -
      66             :  *    initiate asynchronous read of a block of a relation
      67             :  *
      68             :  * Do PrefetchBuffer's work for temporary relations.
      69             :  * No-op if prefetching isn't compiled in.
      70             :  */
      71             : PrefetchBufferResult
      72        1566 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      73             :                     BlockNumber blockNum)
      74             : {
      75        1566 :     PrefetchBufferResult result = {InvalidBuffer, false};
      76             :     BufferTag   newTag;         /* identity of requested block */
      77             :     LocalBufferLookupEnt *hresult;
      78             : 
      79        1566 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      80             : 
      81             :     /* Initialize local buffers if first request in this session */
      82        1566 :     if (LocalBufHash == NULL)
      83           0 :         InitLocalBuffers();
      84             : 
      85             :     /* See if the desired buffer already exists */
      86             :     hresult = (LocalBufferLookupEnt *)
      87        1566 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      88             : 
      89        1566 :     if (hresult)
      90             :     {
      91             :         /* Yes, so nothing to do */
      92        1566 :         result.recent_buffer = -hresult->id - 1;
      93             :     }
      94             :     else
      95             :     {
      96             : #ifdef USE_PREFETCH
      97             :         /* Not in buffers, so initiate prefetch */
      98           0 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      99           0 :             smgrprefetch(smgr, forkNum, blockNum, 1))
     100             :         {
     101           0 :             result.initiated_io = true;
     102             :         }
     103             : #endif                          /* USE_PREFETCH */
     104             :     }
     105             : 
     106        1566 :     return result;
     107             : }
     108             : 
     109             : 
     110             : /*
     111             :  * LocalBufferAlloc -
     112             :  *    Find or create a local buffer for the given page of the given relation.
     113             :  *
     114             :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
     115             :  * any locking since this is all local.  We support only default access
     116             :  * strategy (hence, usage_count is always advanced).
     117             :  */
     118             : BufferDesc *
     119     2554406 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     120             :                  bool *foundPtr)
     121             : {
     122             :     BufferTag   newTag;         /* identity of requested block */
     123             :     LocalBufferLookupEnt *hresult;
     124             :     BufferDesc *bufHdr;
     125             :     Buffer      victim_buffer;
     126             :     int         bufid;
     127             :     bool        found;
     128             : 
     129     2554406 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     130             : 
     131             :     /* Initialize local buffers if first request in this session */
     132     2554406 :     if (LocalBufHash == NULL)
     133          26 :         InitLocalBuffers();
     134             : 
     135     2554406 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     136             : 
     137             :     /* See if the desired buffer already exists */
     138             :     hresult = (LocalBufferLookupEnt *)
     139     2554406 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     140             : 
     141     2554406 :     if (hresult)
     142             :     {
     143     2537636 :         bufid = hresult->id;
     144     2537636 :         bufHdr = GetLocalBufferDescriptor(bufid);
     145             :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     146             : 
     147     2537636 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     148             :     }
     149             :     else
     150             :     {
     151             :         uint32      buf_state;
     152             : 
     153       16770 :         victim_buffer = GetLocalVictimBuffer();
     154       16758 :         bufid = -victim_buffer - 1;
     155       16758 :         bufHdr = GetLocalBufferDescriptor(bufid);
     156             : 
     157             :         hresult = (LocalBufferLookupEnt *)
     158       16758 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     159       16758 :         if (found)              /* shouldn't happen */
     160           0 :             elog(ERROR, "local buffer hash table corrupted");
     161       16758 :         hresult->id = bufid;
     162             : 
     163             :         /*
     164             :          * it's all ours now.
     165             :          */
     166       16758 :         bufHdr->tag = newTag;
     167             : 
     168       16758 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     169       16758 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     170       16758 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     171       16758 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     172             : 
     173       16758 :         *foundPtr = false;
     174             :     }
     175             : 
     176     2554394 :     return bufHdr;
     177             : }
     178             : 
     179             : /*
     180             :  * Like FlushBuffer(), just for local buffers.
     181             :  */
     182             : void
     183        7268 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
     184             : {
     185             :     instr_time  io_start;
     186        7268 :     Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     187             : 
     188             :     Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
     189             : 
     190             :     /*
     191             :      * Try to start an I/O operation.  There currently are no reasons for
     192             :      * StartLocalBufferIO to return false, so we raise an error in that case.
     193             :      */
     194        7268 :     if (!StartLocalBufferIO(bufHdr, false, false))
     195           0 :         elog(ERROR, "failed to start write IO on local buffer");
     196             : 
     197             :     /* Find smgr relation for buffer */
     198        7268 :     if (reln == NULL)
     199        6668 :         reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
     200             :                         MyProcNumber);
     201             : 
     202        7268 :     PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
     203             : 
     204        7268 :     io_start = pgstat_prepare_io_time(track_io_timing);
     205             : 
     206             :     /* And write... */
     207        7268 :     smgrwrite(reln,
     208        7268 :               BufTagGetForkNum(&bufHdr->tag),
     209             :               bufHdr->tag.blockNum,
     210             :               localpage,
     211             :               false);
     212             : 
     213             :     /* Temporary table I/O does not use Buffer Access Strategies */
     214        7268 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     215             :                             IOOP_WRITE, io_start, 1, BLCKSZ);
     216             : 
     217             :     /* Mark not-dirty */
     218        7268 :     TerminateLocalBufferIO(bufHdr, true, 0, false);
     219             : 
     220        7268 :     pgBufferUsage.local_blks_written++;
     221        7268 : }
     222             : 
     223             : static Buffer
     224       46162 : GetLocalVictimBuffer(void)
     225             : {
     226             :     int         victim_bufid;
     227             :     int         trycounter;
     228             :     BufferDesc *bufHdr;
     229             : 
     230       46162 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     231             : 
     232             :     /*
     233             :      * Need to get a new buffer.  We use a clock-sweep algorithm (essentially
     234             :      * the same as what freelist.c does now...)
     235             :      */
     236       46162 :     trycounter = NLocBuffer;
     237             :     for (;;)
     238             :     {
     239      206236 :         victim_bufid = nextFreeLocalBufId;
     240             : 
     241      206236 :         if (++nextFreeLocalBufId >= NLocBuffer)
     242        1734 :             nextFreeLocalBufId = 0;
     243             : 
     244      206236 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     245             : 
     246      206236 :         if (LocalRefCount[victim_bufid] == 0)
     247             :         {
     248       84904 :             uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
     249             : 
     250       84904 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     251             :             {
     252       38754 :                 buf_state -= BUF_USAGECOUNT_ONE;
     253       38754 :                 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     254       38754 :                 trycounter = NLocBuffer;
     255             :             }
     256       46150 :             else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
     257             :             {
     258             :                 /*
     259             :                  * This can be reached if the backend initiated AIO for this
     260             :                  * buffer and then errored out.
     261             :                  */
     262             :             }
     263             :             else
     264             :             {
     265             :                 /* Found a usable buffer */
     266       46150 :                 PinLocalBuffer(bufHdr, false);
     267       46150 :                 break;
     268             :             }
     269             :         }
     270      121332 :         else if (--trycounter == 0)
     271          12 :             ereport(ERROR,
     272             :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     273             :                      errmsg("no empty local buffer available")));
     274             :     }
     275             : 
     276             :     /*
     277             :      * lazy memory allocation: allocate space on first use of a buffer.
     278             :      */
     279       46150 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     280             :     {
     281             :         /* Set pointer for use by BufferGetBlock() macro */
     282       31342 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     283             :     }
     284             : 
     285             :     /*
     286             :      * this buffer is not referenced but it might still be dirty. if that's
     287             :      * the case, write it out before reusing it!
     288             :      */
     289       46150 :     if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
     290        6624 :         FlushLocalBuffer(bufHdr, NULL);
     291             : 
     292             :     /*
     293             :      * Remove the victim buffer from the hashtable and mark as invalid.
     294             :      */
     295       46150 :     if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
     296             :     {
     297       12818 :         InvalidateLocalBuffer(bufHdr, false);
     298             : 
     299       12818 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
     300             :     }
     301             : 
     302       46150 :     return BufferDescriptorGetBuffer(bufHdr);
     303             : }
     304             : 
     305             : /* see GetPinLimit() */
     306             : uint32
     307       13762 : GetLocalPinLimit(void)
     308             : {
     309             :     /* Every backend has its own temporary buffers, and can pin them all. */
     310       13762 :     return num_temp_buffers;
     311             : }
     312             : 
     313             : /* see GetAdditionalPinLimit() */
     314             : uint32
     315       47888 : GetAdditionalLocalPinLimit(void)
     316             : {
     317             :     Assert(NLocalPinnedBuffers <= num_temp_buffers);
     318       47888 :     return num_temp_buffers - NLocalPinnedBuffers;
     319             : }
     320             : 
     321             : /* see LimitAdditionalPins() */
     322             : void
     323       22812 : LimitAdditionalLocalPins(uint32 *additional_pins)
     324             : {
     325             :     uint32      max_pins;
     326             : 
     327       22812 :     if (*additional_pins <= 1)
     328       22158 :         return;
     329             : 
     330             :     /*
     331             :      * In contrast to LimitAdditionalPins() other backends don't play a role
     332             :      * here. We can allow up to NLocBuffer pins in total, but it might not be
     333             :      * initialized yet so read num_temp_buffers.
     334             :      */
     335         654 :     max_pins = (num_temp_buffers - NLocalPinnedBuffers);
     336             : 
     337         654 :     if (*additional_pins >= max_pins)
     338           0 :         *additional_pins = max_pins;
     339             : }
     340             : 
     341             : /*
     342             :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     343             :  * temporary buffers.
     344             :  */
     345             : BlockNumber
     346       22812 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
     347             :                        ForkNumber fork,
     348             :                        uint32 flags,
     349             :                        uint32 extend_by,
     350             :                        BlockNumber extend_upto,
     351             :                        Buffer *buffers,
     352             :                        uint32 *extended_by)
     353             : {
     354             :     BlockNumber first_block;
     355             :     instr_time  io_start;
     356             : 
     357             :     /* Initialize local buffers if first request in this session */
     358       22812 :     if (LocalBufHash == NULL)
     359         504 :         InitLocalBuffers();
     360             : 
     361       22812 :     LimitAdditionalLocalPins(&extend_by);
     362             : 
     363       52204 :     for (uint32 i = 0; i < extend_by; i++)
     364             :     {
     365             :         BufferDesc *buf_hdr;
     366             :         Block       buf_block;
     367             : 
     368       29392 :         buffers[i] = GetLocalVictimBuffer();
     369       29392 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     370       29392 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     371             : 
     372             :         /* new buffers are zero-filled */
     373       29392 :         MemSet(buf_block, 0, BLCKSZ);
     374             :     }
     375             : 
     376       22812 :     first_block = smgrnblocks(BMR_GET_SMGR(bmr), fork);
     377             : 
     378             :     if (extend_upto != InvalidBlockNumber)
     379             :     {
     380             :         /*
     381             :          * In contrast to shared relations, nothing could change the relation
     382             :          * size concurrently. Thus we shouldn't end up finding that we don't
     383             :          * need to do anything.
     384             :          */
     385             :         Assert(first_block <= extend_upto);
     386             : 
     387             :         Assert((uint64) first_block + extend_by <= extend_upto);
     388             :     }
     389             : 
     390             :     /* Fail if relation is already at maximum possible length */
     391       22812 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     392           0 :         ereport(ERROR,
     393             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     394             :                  errmsg("cannot extend relation %s beyond %u blocks",
     395             :                         relpath(BMR_GET_SMGR(bmr)->smgr_rlocator, fork).str,
     396             :                         MaxBlockNumber)));
     397             : 
     398       52204 :     for (uint32 i = 0; i < extend_by; i++)
     399             :     {
     400             :         int         victim_buf_id;
     401             :         BufferDesc *victim_buf_hdr;
     402             :         BufferTag   tag;
     403             :         LocalBufferLookupEnt *hresult;
     404             :         bool        found;
     405             : 
     406       29392 :         victim_buf_id = -buffers[i] - 1;
     407       29392 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     408             : 
     409             :         /* in case we need to pin an existing buffer below */
     410       29392 :         ResourceOwnerEnlarge(CurrentResourceOwner);
     411             : 
     412       29392 :         InitBufferTag(&tag, &BMR_GET_SMGR(bmr)->smgr_rlocator.locator, fork,
     413             :                       first_block + i);
     414             : 
     415             :         hresult = (LocalBufferLookupEnt *)
     416       29392 :             hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
     417       29392 :         if (found)
     418             :         {
     419             :             BufferDesc *existing_hdr;
     420             :             uint32      buf_state;
     421             : 
     422           0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     423             : 
     424           0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     425           0 :             PinLocalBuffer(existing_hdr, false);
     426           0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     427             : 
     428             :             /*
     429             :              * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
     430             :              */
     431           0 :             buf_state = pg_atomic_read_u32(&existing_hdr->state);
     432             :             Assert(buf_state & BM_TAG_VALID);
     433             :             Assert(!(buf_state & BM_DIRTY));
     434           0 :             buf_state &= ~BM_VALID;
     435           0 :             pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
     436             : 
     437             :             /* no need to loop for local buffers */
     438           0 :             StartLocalBufferIO(existing_hdr, true, false);
     439             :         }
     440             :         else
     441             :         {
     442       29392 :             uint32      buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
     443             : 
     444             :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
     445             : 
     446       29392 :             victim_buf_hdr->tag = tag;
     447             : 
     448       29392 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     449             : 
     450       29392 :             pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
     451             : 
     452       29392 :             hresult->id = victim_buf_id;
     453             : 
     454       29392 :             StartLocalBufferIO(victim_buf_hdr, true, false);
     455             :         }
     456             :     }
     457             : 
     458       22812 :     io_start = pgstat_prepare_io_time(track_io_timing);
     459             : 
     460             :     /* actually extend relation */
     461       22812 :     smgrzeroextend(BMR_GET_SMGR(bmr), fork, first_block, extend_by, false);
     462             : 
     463       22812 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     464       22812 :                             io_start, 1, extend_by * BLCKSZ);
     465             : 
     466       52204 :     for (uint32 i = 0; i < extend_by; i++)
     467             :     {
     468       29392 :         Buffer      buf = buffers[i];
     469             :         BufferDesc *buf_hdr;
     470             :         uint32      buf_state;
     471             : 
     472       29392 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     473             : 
     474       29392 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     475       29392 :         buf_state |= BM_VALID;
     476       29392 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     477             :     }
     478             : 
     479       22812 :     *extended_by = extend_by;
     480             : 
     481       22812 :     pgBufferUsage.local_blks_written += extend_by;
     482             : 
     483       22812 :     return first_block;
     484             : }
     485             : 
     486             : /*
     487             :  * MarkLocalBufferDirty -
     488             :  *    mark a local buffer dirty
     489             :  */
     490             : void
     491     3714804 : MarkLocalBufferDirty(Buffer buffer)
     492             : {
     493             :     int         bufid;
     494             :     BufferDesc *bufHdr;
     495             :     uint32      buf_state;
     496             : 
     497             :     Assert(BufferIsLocal(buffer));
     498             : 
     499             : #ifdef LBDEBUG
     500             :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     501             : #endif
     502             : 
     503     3714804 :     bufid = -buffer - 1;
     504             : 
     505             :     Assert(LocalRefCount[bufid] > 0);
     506             : 
     507     3714804 :     bufHdr = GetLocalBufferDescriptor(bufid);
     508             : 
     509     3714804 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     510             : 
     511     3714804 :     if (!(buf_state & BM_DIRTY))
     512       29726 :         pgBufferUsage.local_blks_dirtied++;
     513             : 
     514     3714804 :     buf_state |= BM_DIRTY;
     515             : 
     516     3714804 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     517     3714804 : }
     518             : 
     519             : /*
     520             :  * Like StartBufferIO, but for local buffers
     521             :  */
     522             : bool
     523       53542 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
     524             : {
     525             :     uint32      buf_state;
     526             : 
     527             :     /*
     528             :      * With AIO the buffer could have IO in progress, e.g. when there are two
     529             :      * scans of the same relation. Either wait for the other IO or return
     530             :      * false.
     531             :      */
     532       53542 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     533             :     {
     534           0 :         PgAioWaitRef iow = bufHdr->io_wref;
     535             : 
     536           0 :         if (nowait)
     537           0 :             return false;
     538             : 
     539           0 :         pgaio_wref_wait(&iow);
     540             :     }
     541             : 
     542             :     /* Once we get here, there is definitely no I/O active on this buffer */
     543             : 
     544             :     /* Check if someone else already did the I/O */
     545       53542 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     546       53542 :     if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
     547             :     {
     548           4 :         return false;
     549             :     }
     550             : 
     551             :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     552             : 
     553             :     /* local buffers don't track IO using resowners */
     554             : 
     555       53538 :     return true;
     556             : }
     557             : 
     558             : /*
     559             :  * Like TerminateBufferIO, but for local buffers
     560             :  */
     561             : void
     562       24142 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
     563             :                        bool release_aio)
     564             : {
     565             :     /* Only need to adjust flags */
     566       24142 :     uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
     567             : 
     568             :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     569             : 
     570             :     /* Clear earlier errors, if this IO failed, it'll be marked again */
     571       24142 :     buf_state &= ~BM_IO_ERROR;
     572             : 
     573       24142 :     if (clear_dirty)
     574        7268 :         buf_state &= ~BM_DIRTY;
     575             : 
     576       24142 :     if (release_aio)
     577             :     {
     578             :         /* release pin held by IO subsystem, see also buffer_stage_common() */
     579             :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     580       16818 :         buf_state -= BUF_REFCOUNT_ONE;
     581       16818 :         pgaio_wref_clear(&bufHdr->io_wref);
     582             :     }
     583             : 
     584       24142 :     buf_state |= set_flag_bits;
     585       24142 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     586             : 
     587             :     /* local buffers don't track IO using resowners */
     588             : 
     589             :     /* local buffers don't use the IO CV, as no other process can see buffer */
     590             : 
     591             :     /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
     592       24142 : }
     593             : 
     594             : /*
     595             :  * InvalidateLocalBuffer -- mark a local buffer invalid.
     596             :  *
     597             :  * If check_unreferenced is true, error out if the buffer is still
     598             :  * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
     599             :  * as part of changing the identity of a buffer, instead of just dropping the
     600             :  * buffer.
     601             :  *
     602             :  * See also InvalidateBuffer().
     603             :  */
     604             : void
     605       46150 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
     606             : {
     607       46150 :     Buffer      buffer = BufferDescriptorGetBuffer(bufHdr);
     608       46150 :     int         bufid = -buffer - 1;
     609             :     uint32      buf_state;
     610             :     LocalBufferLookupEnt *hresult;
     611             : 
     612             :     /*
     613             :      * It's possible that we started IO on this buffer before e.g. aborting
     614             :      * the transaction that created a table. We need to wait for that IO to
     615             :      * complete before removing / reusing the buffer.
     616             :      */
     617       46150 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     618             :     {
     619           0 :         PgAioWaitRef iow = bufHdr->io_wref;
     620             : 
     621           0 :         pgaio_wref_wait(&iow);
     622             :         Assert(!pgaio_wref_valid(&bufHdr->io_wref));
     623             :     }
     624             : 
     625       46150 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     626             : 
     627             :     /*
     628             :      * We need to test not just LocalRefCount[bufid] but also the BufferDesc
     629             :      * itself, as the latter is used to represent a pin by the AIO subsystem.
     630             :      * This can happen if AIO is initiated and then the query errors out.
     631             :      */
     632       46150 :     if (check_unreferenced &&
     633       33332 :         (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
     634           0 :         elog(ERROR, "block %u of %s is still referenced (local %d)",
     635             :              bufHdr->tag.blockNum,
     636             :              relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     637             :                             MyProcNumber,
     638             :                             BufTagGetForkNum(&bufHdr->tag)).str,
     639             :              LocalRefCount[bufid]);
     640             : 
     641             :     /* Remove entry from hashtable */
     642             :     hresult = (LocalBufferLookupEnt *)
     643       46150 :         hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     644       46150 :     if (!hresult)               /* shouldn't happen */
     645           0 :         elog(ERROR, "local buffer hash table corrupted");
     646             :     /* Mark buffer invalid */
     647       46150 :     ClearBufferTag(&bufHdr->tag);
     648       46150 :     buf_state &= ~BUF_FLAG_MASK;
     649       46150 :     buf_state &= ~BUF_USAGECOUNT_MASK;
     650       46150 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     651       46150 : }
     652             : 
     653             : /*
     654             :  * DropRelationLocalBuffers
     655             :  *      This function removes from the buffer pool all the pages of the
     656             :  *      specified relation that have block numbers >= firstDelBlock.
     657             :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     658             :  *      Dirty pages are simply dropped, without bothering to write them
     659             :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     660             :  *      used only with extreme caution!
     661             :  *
     662             :  *      See DropRelationBuffers in bufmgr.c for more notes.
     663             :  */
     664             : void
     665         748 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
     666             :                          int nforks, BlockNumber *firstDelBlock)
     667             : {
     668             :     int         i;
     669             :     int         j;
     670             : 
     671      617196 :     for (i = 0; i < NLocBuffer; i++)
     672             :     {
     673      616448 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     674             :         uint32      buf_state;
     675             : 
     676      616448 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     677             : 
     678      616448 :         if (!(buf_state & BM_TAG_VALID) ||
     679       56606 :             !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     680      614776 :             continue;
     681             : 
     682        1912 :         for (j = 0; j < nforks; j++)
     683             :         {
     684        1842 :             if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
     685        1666 :                 bufHdr->tag.blockNum >= firstDelBlock[j])
     686             :             {
     687        1602 :                 InvalidateLocalBuffer(bufHdr, true);
     688        1602 :                 break;
     689             :             }
     690             :         }
     691             :     }
     692         748 : }
     693             : 
     694             : /*
     695             :  * DropRelationAllLocalBuffers
     696             :  *      This function removes from the buffer pool all pages of all forks
     697             :  *      of the specified relation.
     698             :  *
     699             :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     700             :  */
     701             : void
     702        6348 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     703             : {
     704             :     int         i;
     705             : 
     706     6031676 :     for (i = 0; i < NLocBuffer; i++)
     707             :     {
     708     6025328 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     709             :         uint32      buf_state;
     710             : 
     711     6025328 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     712             : 
     713     6481398 :         if ((buf_state & BM_TAG_VALID) &&
     714      456070 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     715             :         {
     716       31630 :             InvalidateLocalBuffer(bufHdr, true);
     717             :         }
     718             :     }
     719        6348 : }
     720             : 
     721             : /*
     722             :  * InitLocalBuffers -
     723             :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     724             :  *    don't involve local buffers, we delay allocating actual memory for the
     725             :  *    buffers until we need them; just make the buffer headers here.
     726             :  */
     727             : static void
     728         530 : InitLocalBuffers(void)
     729             : {
     730         530 :     int         nbufs = num_temp_buffers;
     731             :     HASHCTL     info;
     732             :     int         i;
     733             : 
     734             :     /*
     735             :      * Parallel workers can't access data in temporary tables, because they
     736             :      * have no visibility into the local buffers of their leader.  This is a
     737             :      * convenient, low-cost place to provide a backstop check for that.  Note
     738             :      * that we don't wish to prevent a parallel worker from accessing catalog
     739             :      * metadata about a temp table, so checks at higher levels would be
     740             :      * inappropriate.
     741             :      */
     742         530 :     if (IsParallelWorker())
     743           0 :         ereport(ERROR,
     744             :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     745             :                  errmsg("cannot access temporary tables during a parallel operation")));
     746             : 
     747             :     /* Allocate and zero buffer headers and auxiliary arrays */
     748         530 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     749         530 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     750         530 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     751         530 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     752           0 :         ereport(FATAL,
     753             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     754             :                  errmsg("out of memory")));
     755             : 
     756         530 :     nextFreeLocalBufId = 0;
     757             : 
     758             :     /* initialize fields that need to start off nonzero */
     759      513682 :     for (i = 0; i < nbufs; i++)
     760             :     {
     761      513152 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     762             : 
     763             :         /*
     764             :          * negative to indicate local buffer. This is tricky: shared buffers
     765             :          * start with 0. We have to start with -2. (Note that the routine
     766             :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     767             :          * is -1.)
     768             :          */
     769      513152 :         buf->buf_id = -i - 2;
     770             : 
     771      513152 :         pgaio_wref_clear(&buf->io_wref);
     772             : 
     773             :         /*
     774             :          * Intentionally do not initialize the buffer's atomic variable
     775             :          * (besides zeroing the underlying memory above). That way we get
     776             :          * errors on platforms without atomics, if somebody (re-)introduces
     777             :          * atomic operations for local buffers.
     778             :          */
     779             :     }
     780             : 
     781             :     /* Create the lookup hash table */
     782         530 :     info.keysize = sizeof(BufferTag);
     783         530 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     784             : 
     785         530 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     786             :                                nbufs,
     787             :                                &info,
     788             :                                HASH_ELEM | HASH_BLOBS);
     789             : 
     790         530 :     if (!LocalBufHash)
     791           0 :         elog(ERROR, "could not initialize local buffer hash table");
     792             : 
     793             :     /* Initialization done, mark buffers allocated */
     794         530 :     NLocBuffer = nbufs;
     795         530 : }
     796             : 
     797             : /*
     798             :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     799             :  * that does not support adjusting the usagecount - but so far it does not
     800             :  * seem worth the trouble.
     801             :  *
     802             :  * Note that ResourceOwnerEnlarge() must have been done already.
     803             :  */
     804             : bool
     805     2584450 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     806             : {
     807             :     uint32      buf_state;
     808     2584450 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     809     2584450 :     int         bufid = -buffer - 1;
     810             : 
     811     2584450 :     buf_state = pg_atomic_read_u32(&buf_hdr->state);
     812             : 
     813     2584450 :     if (LocalRefCount[bufid] == 0)
     814             :     {
     815     2408380 :         NLocalPinnedBuffers++;
     816     2408380 :         buf_state += BUF_REFCOUNT_ONE;
     817     2408380 :         if (adjust_usagecount &&
     818     2361630 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     819             :         {
     820      129102 :             buf_state += BUF_USAGECOUNT_ONE;
     821             :         }
     822     2408380 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     823             : 
     824             :         /*
     825             :          * See comment in PinBuffer().
     826             :          *
     827             :          * If the buffer isn't allocated yet, it'll be marked as defined in
     828             :          * GetLocalBufferStorage().
     829             :          */
     830     2408380 :         if (LocalBufHdrGetBlock(buf_hdr) != NULL)
     831             :             VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     832             :     }
     833     2584450 :     LocalRefCount[bufid]++;
     834     2584450 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     835             :                                 BufferDescriptorGetBuffer(buf_hdr));
     836             : 
     837     2584450 :     return buf_state & BM_VALID;
     838             : }
     839             : 
     840             : void
     841     3287756 : UnpinLocalBuffer(Buffer buffer)
     842             : {
     843     3287756 :     UnpinLocalBufferNoOwner(buffer);
     844     3287756 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     845     3287756 : }
     846             : 
     847             : void
     848     3293822 : UnpinLocalBufferNoOwner(Buffer buffer)
     849             : {
     850     3293822 :     int         buffid = -buffer - 1;
     851             : 
     852             :     Assert(BufferIsLocal(buffer));
     853             :     Assert(LocalRefCount[buffid] > 0);
     854             :     Assert(NLocalPinnedBuffers > 0);
     855             : 
     856     3293822 :     if (--LocalRefCount[buffid] == 0)
     857             :     {
     858     2408380 :         BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
     859             :         uint32      buf_state;
     860             : 
     861     2408380 :         NLocalPinnedBuffers--;
     862             : 
     863     2408380 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     864             :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     865     2408380 :         buf_state -= BUF_REFCOUNT_ONE;
     866     2408380 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     867             : 
     868             :         /* see comment in UnpinBufferNoOwner */
     869             :         VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     870             :     }
     871     3293822 : }
     872             : 
     873             : /*
     874             :  * GUC check_hook for temp_buffers
     875             :  */
     876             : bool
     877        2292 : check_temp_buffers(int *newval, void **extra, GucSource source)
     878             : {
     879             :     /*
     880             :      * Once local buffers have been initialized, it's too late to change this.
     881             :      * However, if this is only a test call, allow it.
     882             :      */
     883        2292 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     884             :     {
     885           0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     886           0 :         return false;
     887             :     }
     888        2292 :     return true;
     889             : }
     890             : 
     891             : /*
     892             :  * GetLocalBufferStorage - allocate memory for a local buffer
     893             :  *
     894             :  * The idea of this function is to aggregate our requests for storage
     895             :  * so that the memory manager doesn't see a whole lot of relatively small
     896             :  * requests.  Since we'll never give back a local buffer once it's created
     897             :  * within a particular process, no point in burdening memmgr with separately
     898             :  * managed chunks.
     899             :  */
     900             : static Block
     901       31342 : GetLocalBufferStorage(void)
     902             : {
     903             :     static char *cur_block = NULL;
     904             :     static int  next_buf_in_block = 0;
     905             :     static int  num_bufs_in_block = 0;
     906             :     static int  total_bufs_allocated = 0;
     907             :     static MemoryContext LocalBufferContext = NULL;
     908             : 
     909             :     char       *this_buf;
     910             : 
     911             :     Assert(total_bufs_allocated < NLocBuffer);
     912             : 
     913       31342 :     if (next_buf_in_block >= num_bufs_in_block)
     914             :     {
     915             :         /* Need to make a new request to memmgr */
     916             :         int         num_bufs;
     917             : 
     918             :         /*
     919             :          * We allocate local buffers in a context of their own, so that the
     920             :          * space eaten for them is easily recognizable in MemoryContextStats
     921             :          * output.  Create the context on first use.
     922             :          */
     923         854 :         if (LocalBufferContext == NULL)
     924         530 :             LocalBufferContext =
     925         530 :                 AllocSetContextCreate(TopMemoryContext,
     926             :                                       "LocalBufferContext",
     927             :                                       ALLOCSET_DEFAULT_SIZES);
     928             : 
     929             :         /* Start with a 16-buffer request; subsequent ones double each time */
     930         854 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     931             :         /* But not more than what we need for all remaining local bufs */
     932         854 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     933             :         /* And don't overflow MaxAllocSize, either */
     934         854 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     935             : 
     936             :         /* Buffers should be I/O aligned. */
     937        1708 :         cur_block = MemoryContextAllocAligned(LocalBufferContext,
     938         854 :                                               num_bufs * BLCKSZ,
     939             :                                               PG_IO_ALIGN_SIZE,
     940             :                                               0);
     941             : 
     942         854 :         next_buf_in_block = 0;
     943         854 :         num_bufs_in_block = num_bufs;
     944             :     }
     945             : 
     946             :     /* Allocate next buffer in current memory block */
     947       31342 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     948       31342 :     next_buf_in_block++;
     949       31342 :     total_bufs_allocated++;
     950             : 
     951             :     /*
     952             :      * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
     953             :      * here.  The block is actually undefined, but we want consistency with
     954             :      * the regular case of not needing to allocate memory.  This is
     955             :      * specifically needed when method_io_uring.c fills the block, because
     956             :      * Valgrind doesn't recognize io_uring reads causing undefined memory to
     957             :      * become defined.
     958             :      */
     959             :     VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
     960             : 
     961       31342 :     return (Block) this_buf;
     962             : }
     963             : 
     964             : /*
     965             :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     966             :  *
     967             :  * This is just like CheckForBufferLeaks(), but for local buffers.
     968             :  */
     969             : static void
     970     1140786 : CheckForLocalBufferLeaks(void)
     971             : {
     972             : #ifdef USE_ASSERT_CHECKING
     973             :     if (LocalRefCount)
     974             :     {
     975             :         int         RefCountErrors = 0;
     976             :         int         i;
     977             : 
     978             :         for (i = 0; i < NLocBuffer; i++)
     979             :         {
     980             :             if (LocalRefCount[i] != 0)
     981             :             {
     982             :                 Buffer      b = -i - 1;
     983             :                 char       *s;
     984             : 
     985             :                 s = DebugPrintBufferRefcount(b);
     986             :                 elog(WARNING, "local buffer refcount leak: %s", s);
     987             :                 pfree(s);
     988             : 
     989             :                 RefCountErrors++;
     990             :             }
     991             :         }
     992             :         Assert(RefCountErrors == 0);
     993             :     }
     994             : #endif
     995     1140786 : }
     996             : 
     997             : /*
     998             :  * AtEOXact_LocalBuffers - clean up at end of transaction.
     999             :  *
    1000             :  * This is just like AtEOXact_Buffers, but for local buffers.
    1001             :  */
    1002             : void
    1003     1096068 : AtEOXact_LocalBuffers(bool isCommit)
    1004             : {
    1005     1096068 :     CheckForLocalBufferLeaks();
    1006     1096068 : }
    1007             : 
    1008             : /*
    1009             :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
    1010             :  *
    1011             :  * This is just like AtProcExit_Buffers, but for local buffers.
    1012             :  */
    1013             : void
    1014       44718 : AtProcExit_LocalBuffers(void)
    1015             : {
    1016             :     /*
    1017             :      * We shouldn't be holding any remaining pins; if we are, and assertions
    1018             :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
    1019             :      * drop the temp rels.
    1020             :      */
    1021       44718 :     CheckForLocalBufferLeaks();
    1022       44718 : }

Generated by: LCOV version 1.16