LCOV - code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 247 276 89.5 %
Date: 2025-04-24 12:15:10 Functions: 23 23 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * localbuf.c
       4             :  *    local buffer manager. Fast buffer manager for temporary tables,
       5             :  *    which never need to be WAL-logged or checkpointed, etc.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9             :  *
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/storage/buffer/localbuf.c
      13             :  *
      14             :  *-------------------------------------------------------------------------
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/parallel.h"
      19             : #include "executor/instrument.h"
      20             : #include "pgstat.h"
      21             : #include "storage/aio.h"
      22             : #include "storage/buf_internals.h"
      23             : #include "storage/bufmgr.h"
      24             : #include "storage/fd.h"
      25             : #include "utils/guc_hooks.h"
      26             : #include "utils/memdebug.h"
      27             : #include "utils/memutils.h"
      28             : #include "utils/resowner.h"
      29             : 
      30             : 
      31             : /*#define LBDEBUG*/
      32             : 
      33             : /* entry for buffer lookup hashtable */
      34             : typedef struct
      35             : {
      36             :     BufferTag   key;            /* Tag of a disk page */
      37             :     int         id;             /* Associated local buffer's index */
      38             : } LocalBufferLookupEnt;
      39             : 
      40             : /* Note: this macro only works on local buffers, not shared ones! */
      41             : #define LocalBufHdrGetBlock(bufHdr) \
      42             :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      43             : 
      44             : int         NLocBuffer = 0;     /* until buffers are initialized */
      45             : 
      46             : BufferDesc *LocalBufferDescriptors = NULL;
      47             : Block      *LocalBufferBlockPointers = NULL;
      48             : int32      *LocalRefCount = NULL;
      49             : 
      50             : static int  nextFreeLocalBufId = 0;
      51             : 
      52             : static HTAB *LocalBufHash = NULL;
      53             : 
      54             : /* number of local buffers pinned at least once */
      55             : static int  NLocalPinnedBuffers = 0;
      56             : 
      57             : 
      58             : static void InitLocalBuffers(void);
      59             : static Block GetLocalBufferStorage(void);
      60             : static Buffer GetLocalVictimBuffer(void);
      61             : 
      62             : 
      63             : /*
      64             :  * PrefetchLocalBuffer -
      65             :  *    initiate asynchronous read of a block of a relation
      66             :  *
      67             :  * Do PrefetchBuffer's work for temporary relations.
      68             :  * No-op if prefetching isn't compiled in.
      69             :  */
      70             : PrefetchBufferResult
      71        1566 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      72             :                     BlockNumber blockNum)
      73             : {
      74        1566 :     PrefetchBufferResult result = {InvalidBuffer, false};
      75             :     BufferTag   newTag;         /* identity of requested block */
      76             :     LocalBufferLookupEnt *hresult;
      77             : 
      78        1566 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      79             : 
      80             :     /* Initialize local buffers if first request in this session */
      81        1566 :     if (LocalBufHash == NULL)
      82           0 :         InitLocalBuffers();
      83             : 
      84             :     /* See if the desired buffer already exists */
      85             :     hresult = (LocalBufferLookupEnt *)
      86        1566 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      87             : 
      88        1566 :     if (hresult)
      89             :     {
      90             :         /* Yes, so nothing to do */
      91        1566 :         result.recent_buffer = -hresult->id - 1;
      92             :     }
      93             :     else
      94             :     {
      95             : #ifdef USE_PREFETCH
      96             :         /* Not in buffers, so initiate prefetch */
      97           0 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      98           0 :             smgrprefetch(smgr, forkNum, blockNum, 1))
      99             :         {
     100           0 :             result.initiated_io = true;
     101             :         }
     102             : #endif                          /* USE_PREFETCH */
     103             :     }
     104             : 
     105        1566 :     return result;
     106             : }
     107             : 
     108             : 
     109             : /*
     110             :  * LocalBufferAlloc -
     111             :  *    Find or create a local buffer for the given page of the given relation.
     112             :  *
     113             :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
     114             :  * any locking since this is all local.  We support only default access
     115             :  * strategy (hence, usage_count is always advanced).
     116             :  */
     117             : BufferDesc *
     118     2542228 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     119             :                  bool *foundPtr)
     120             : {
     121             :     BufferTag   newTag;         /* identity of requested block */
     122             :     LocalBufferLookupEnt *hresult;
     123             :     BufferDesc *bufHdr;
     124             :     Buffer      victim_buffer;
     125             :     int         bufid;
     126             :     bool        found;
     127             : 
     128     2542228 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     129             : 
     130             :     /* Initialize local buffers if first request in this session */
     131     2542228 :     if (LocalBufHash == NULL)
     132          26 :         InitLocalBuffers();
     133             : 
     134     2542228 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     135             : 
     136             :     /* See if the desired buffer already exists */
     137             :     hresult = (LocalBufferLookupEnt *)
     138     2542228 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     139             : 
     140     2542228 :     if (hresult)
     141             :     {
     142     2525488 :         bufid = hresult->id;
     143     2525488 :         bufHdr = GetLocalBufferDescriptor(bufid);
     144             :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     145             : 
     146     2525488 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     147             :     }
     148             :     else
     149             :     {
     150             :         uint32      buf_state;
     151             : 
     152       16740 :         victim_buffer = GetLocalVictimBuffer();
     153       16728 :         bufid = -victim_buffer - 1;
     154       16728 :         bufHdr = GetLocalBufferDescriptor(bufid);
     155             : 
     156             :         hresult = (LocalBufferLookupEnt *)
     157       16728 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     158       16728 :         if (found)              /* shouldn't happen */
     159           0 :             elog(ERROR, "local buffer hash table corrupted");
     160       16728 :         hresult->id = bufid;
     161             : 
     162             :         /*
     163             :          * it's all ours now.
     164             :          */
     165       16728 :         bufHdr->tag = newTag;
     166             : 
     167       16728 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     168       16728 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     169       16728 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     170       16728 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     171             : 
     172       16728 :         *foundPtr = false;
     173             :     }
     174             : 
     175     2542216 :     return bufHdr;
     176             : }
     177             : 
     178             : /*
     179             :  * Like FlushBuffer(), just for local buffers.
     180             :  */
     181             : void
     182        7268 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
     183             : {
     184             :     instr_time  io_start;
     185        7268 :     Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     186             : 
     187             :     Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
     188             : 
     189             :     /*
     190             :      * Try to start an I/O operation.  There currently are no reasons for
     191             :      * StartLocalBufferIO to return false, so we raise an error in that case.
     192             :      */
     193        7268 :     if (!StartLocalBufferIO(bufHdr, false, false))
     194           0 :         elog(ERROR, "failed to start write IO on local buffer");
     195             : 
     196             :     /* Find smgr relation for buffer */
     197        7268 :     if (reln == NULL)
     198        6668 :         reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
     199             :                         MyProcNumber);
     200             : 
     201        7268 :     PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
     202             : 
     203        7268 :     io_start = pgstat_prepare_io_time(track_io_timing);
     204             : 
     205             :     /* And write... */
     206        7268 :     smgrwrite(reln,
     207        7268 :               BufTagGetForkNum(&bufHdr->tag),
     208             :               bufHdr->tag.blockNum,
     209             :               localpage,
     210             :               false);
     211             : 
     212             :     /* Temporary table I/O does not use Buffer Access Strategies */
     213        7268 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     214             :                             IOOP_WRITE, io_start, 1, BLCKSZ);
     215             : 
     216             :     /* Mark not-dirty */
     217        7268 :     TerminateLocalBufferIO(bufHdr, true, 0, false);
     218             : 
     219        7268 :     pgBufferUsage.local_blks_written++;
     220        7268 : }
     221             : 
     222             : static Buffer
     223       46026 : GetLocalVictimBuffer(void)
     224             : {
     225             :     int         victim_bufid;
     226             :     int         trycounter;
     227             :     BufferDesc *bufHdr;
     228             : 
     229       46026 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     230             : 
     231             :     /*
     232             :      * Need to get a new buffer.  We use a clock sweep algorithm (essentially
     233             :      * the same as what freelist.c does now...)
     234             :      */
     235       46026 :     trycounter = NLocBuffer;
     236             :     for (;;)
     237             :     {
     238      206100 :         victim_bufid = nextFreeLocalBufId;
     239             : 
     240      206100 :         if (++nextFreeLocalBufId >= NLocBuffer)
     241        1734 :             nextFreeLocalBufId = 0;
     242             : 
     243      206100 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     244             : 
     245      206100 :         if (LocalRefCount[victim_bufid] == 0)
     246             :         {
     247       84768 :             uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
     248             : 
     249       84768 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     250             :             {
     251       38754 :                 buf_state -= BUF_USAGECOUNT_ONE;
     252       38754 :                 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     253       38754 :                 trycounter = NLocBuffer;
     254             :             }
     255       46014 :             else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
     256             :             {
     257             :                 /*
     258             :                  * This can be reached if the backend initiated AIO for this
     259             :                  * buffer and then errored out.
     260             :                  */
     261             :             }
     262             :             else
     263             :             {
     264             :                 /* Found a usable buffer */
     265       46014 :                 PinLocalBuffer(bufHdr, false);
     266       46014 :                 break;
     267             :             }
     268             :         }
     269      121332 :         else if (--trycounter == 0)
     270          12 :             ereport(ERROR,
     271             :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     272             :                      errmsg("no empty local buffer available")));
     273             :     }
     274             : 
     275             :     /*
     276             :      * lazy memory allocation: allocate space on first use of a buffer.
     277             :      */
     278       46014 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     279             :     {
     280             :         /* Set pointer for use by BufferGetBlock() macro */
     281       31206 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     282             :     }
     283             : 
     284             :     /*
     285             :      * this buffer is not referenced but it might still be dirty. if that's
     286             :      * the case, write it out before reusing it!
     287             :      */
     288       46014 :     if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
     289        6624 :         FlushLocalBuffer(bufHdr, NULL);
     290             : 
     291             :     /*
     292             :      * Remove the victim buffer from the hashtable and mark as invalid.
     293             :      */
     294       46014 :     if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
     295             :     {
     296       12818 :         InvalidateLocalBuffer(bufHdr, false);
     297             : 
     298       12818 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
     299             :     }
     300             : 
     301       46014 :     return BufferDescriptorGetBuffer(bufHdr);
     302             : }
     303             : 
     304             : /* see GetPinLimit() */
     305             : uint32
     306       13454 : GetLocalPinLimit(void)
     307             : {
     308             :     /* Every backend has its own temporary buffers, and can pin them all. */
     309       13454 :     return num_temp_buffers;
     310             : }
     311             : 
     312             : /* see GetAdditionalPinLimit() */
     313             : uint32
     314       47336 : GetAdditionalLocalPinLimit(void)
     315             : {
     316             :     Assert(NLocalPinnedBuffers <= num_temp_buffers);
     317       47336 :     return num_temp_buffers - NLocalPinnedBuffers;
     318             : }
     319             : 
     320             : /* see LimitAdditionalPins() */
     321             : void
     322       22722 : LimitAdditionalLocalPins(uint32 *additional_pins)
     323             : {
     324             :     uint32      max_pins;
     325             : 
     326       22722 :     if (*additional_pins <= 1)
     327       22076 :         return;
     328             : 
     329             :     /*
     330             :      * In contrast to LimitAdditionalPins() other backends don't play a role
     331             :      * here. We can allow up to NLocBuffer pins in total, but it might not be
     332             :      * initialized yet so read num_temp_buffers.
     333             :      */
     334         646 :     max_pins = (num_temp_buffers - NLocalPinnedBuffers);
     335             : 
     336         646 :     if (*additional_pins >= max_pins)
     337           0 :         *additional_pins = max_pins;
     338             : }
     339             : 
     340             : /*
     341             :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     342             :  * temporary buffers.
     343             :  */
     344             : BlockNumber
     345       22722 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
     346             :                        ForkNumber fork,
     347             :                        uint32 flags,
     348             :                        uint32 extend_by,
     349             :                        BlockNumber extend_upto,
     350             :                        Buffer *buffers,
     351             :                        uint32 *extended_by)
     352             : {
     353             :     BlockNumber first_block;
     354             :     instr_time  io_start;
     355             : 
     356             :     /* Initialize local buffers if first request in this session */
     357       22722 :     if (LocalBufHash == NULL)
     358         502 :         InitLocalBuffers();
     359             : 
     360       22722 :     LimitAdditionalLocalPins(&extend_by);
     361             : 
     362       52008 :     for (uint32 i = 0; i < extend_by; i++)
     363             :     {
     364             :         BufferDesc *buf_hdr;
     365             :         Block       buf_block;
     366             : 
     367       29286 :         buffers[i] = GetLocalVictimBuffer();
     368       29286 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     369       29286 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     370             : 
     371             :         /* new buffers are zero-filled */
     372       29286 :         MemSet(buf_block, 0, BLCKSZ);
     373             :     }
     374             : 
     375       22722 :     first_block = smgrnblocks(bmr.smgr, fork);
     376             : 
     377             :     if (extend_upto != InvalidBlockNumber)
     378             :     {
     379             :         /*
     380             :          * In contrast to shared relations, nothing could change the relation
     381             :          * size concurrently. Thus we shouldn't end up finding that we don't
     382             :          * need to do anything.
     383             :          */
     384             :         Assert(first_block <= extend_upto);
     385             : 
     386             :         Assert((uint64) first_block + extend_by <= extend_upto);
     387             :     }
     388             : 
     389             :     /* Fail if relation is already at maximum possible length */
     390       22722 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     391           0 :         ereport(ERROR,
     392             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     393             :                  errmsg("cannot extend relation %s beyond %u blocks",
     394             :                         relpath(bmr.smgr->smgr_rlocator, fork).str,
     395             :                         MaxBlockNumber)));
     396             : 
     397       52008 :     for (uint32 i = 0; i < extend_by; i++)
     398             :     {
     399             :         int         victim_buf_id;
     400             :         BufferDesc *victim_buf_hdr;
     401             :         BufferTag   tag;
     402             :         LocalBufferLookupEnt *hresult;
     403             :         bool        found;
     404             : 
     405       29286 :         victim_buf_id = -buffers[i] - 1;
     406       29286 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     407             : 
     408             :         /* in case we need to pin an existing buffer below */
     409       29286 :         ResourceOwnerEnlarge(CurrentResourceOwner);
     410             : 
     411       29286 :         InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
     412             : 
     413             :         hresult = (LocalBufferLookupEnt *)
     414       29286 :             hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
     415       29286 :         if (found)
     416             :         {
     417             :             BufferDesc *existing_hdr;
     418             :             uint32      buf_state;
     419             : 
     420           0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     421             : 
     422           0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     423           0 :             PinLocalBuffer(existing_hdr, false);
     424           0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     425             : 
     426             :             /*
     427             :              * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
     428             :              */
     429           0 :             buf_state = pg_atomic_read_u32(&existing_hdr->state);
     430             :             Assert(buf_state & BM_TAG_VALID);
     431             :             Assert(!(buf_state & BM_DIRTY));
     432           0 :             buf_state &= ~BM_VALID;
     433           0 :             pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
     434             : 
     435             :             /* no need to loop for local buffers */
     436           0 :             StartLocalBufferIO(existing_hdr, true, false);
     437             :         }
     438             :         else
     439             :         {
     440       29286 :             uint32      buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
     441             : 
     442             :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
     443             : 
     444       29286 :             victim_buf_hdr->tag = tag;
     445             : 
     446       29286 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     447             : 
     448       29286 :             pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
     449             : 
     450       29286 :             hresult->id = victim_buf_id;
     451             : 
     452       29286 :             StartLocalBufferIO(victim_buf_hdr, true, false);
     453             :         }
     454             :     }
     455             : 
     456       22722 :     io_start = pgstat_prepare_io_time(track_io_timing);
     457             : 
     458             :     /* actually extend relation */
     459       22722 :     smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
     460             : 
     461       22722 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     462       22722 :                             io_start, 1, extend_by * BLCKSZ);
     463             : 
     464       52008 :     for (uint32 i = 0; i < extend_by; i++)
     465             :     {
     466       29286 :         Buffer      buf = buffers[i];
     467             :         BufferDesc *buf_hdr;
     468             :         uint32      buf_state;
     469             : 
     470       29286 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     471             : 
     472       29286 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     473       29286 :         buf_state |= BM_VALID;
     474       29286 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     475             :     }
     476             : 
     477       22722 :     *extended_by = extend_by;
     478             : 
     479       22722 :     pgBufferUsage.local_blks_written += extend_by;
     480             : 
     481       22722 :     return first_block;
     482             : }
     483             : 
     484             : /*
     485             :  * MarkLocalBufferDirty -
     486             :  *    mark a local buffer dirty
     487             :  */
     488             : void
     489     3697628 : MarkLocalBufferDirty(Buffer buffer)
     490             : {
     491             :     int         bufid;
     492             :     BufferDesc *bufHdr;
     493             :     uint32      buf_state;
     494             : 
     495             :     Assert(BufferIsLocal(buffer));
     496             : 
     497             : #ifdef LBDEBUG
     498             :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     499             : #endif
     500             : 
     501     3697628 :     bufid = -buffer - 1;
     502             : 
     503             :     Assert(LocalRefCount[bufid] > 0);
     504             : 
     505     3697628 :     bufHdr = GetLocalBufferDescriptor(bufid);
     506             : 
     507     3697628 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     508             : 
     509     3697628 :     if (!(buf_state & BM_DIRTY))
     510       29634 :         pgBufferUsage.local_blks_dirtied++;
     511             : 
     512     3697628 :     buf_state |= BM_DIRTY;
     513             : 
     514     3697628 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     515     3697628 : }
     516             : 
     517             : /*
     518             :  * Like StartBufferIO, but for local buffers
     519             :  */
     520             : bool
     521       53406 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
     522             : {
     523             :     uint32      buf_state;
     524             : 
     525             :     /*
     526             :      * With AIO the buffer could have IO in progress, e.g. when there are two
     527             :      * scans of the same relation. Either wait for the other IO or return
     528             :      * false.
     529             :      */
     530       53406 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     531             :     {
     532           0 :         PgAioWaitRef iow = bufHdr->io_wref;
     533             : 
     534           0 :         if (nowait)
     535           0 :             return false;
     536             : 
     537           0 :         pgaio_wref_wait(&iow);
     538             :     }
     539             : 
     540             :     /* Once we get here, there is definitely no I/O active on this buffer */
     541             : 
     542             :     /* Check if someone else already did the I/O */
     543       53406 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     544       53406 :     if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
     545             :     {
     546           4 :         return false;
     547             :     }
     548             : 
     549             :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     550             : 
     551             :     /* local buffers don't track IO using resowners */
     552             : 
     553       53402 :     return true;
     554             : }
     555             : 
     556             : /*
     557             :  * Like TerminateBufferIO, but for local buffers
     558             :  */
     559             : void
     560       24112 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
     561             :                        bool release_aio)
     562             : {
     563             :     /* Only need to adjust flags */
     564       24112 :     uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
     565             : 
     566             :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     567             : 
     568             :     /* Clear earlier errors, if this IO failed, it'll be marked again */
     569       24112 :     buf_state &= ~BM_IO_ERROR;
     570             : 
     571       24112 :     if (clear_dirty)
     572        7268 :         buf_state &= ~BM_DIRTY;
     573             : 
     574       24112 :     if (release_aio)
     575             :     {
     576             :         /* release pin held by IO subsystem, see also buffer_stage_common() */
     577             :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     578       16788 :         buf_state -= BUF_REFCOUNT_ONE;
     579       16788 :         pgaio_wref_clear(&bufHdr->io_wref);
     580             :     }
     581             : 
     582       24112 :     buf_state |= set_flag_bits;
     583       24112 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     584             : 
     585             :     /* local buffers don't track IO using resowners */
     586             : 
     587             :     /* local buffers don't use the IO CV, as no other process can see buffer */
     588             : 
     589             :     /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
     590       24112 : }
     591             : 
     592             : /*
     593             :  * InvalidateLocalBuffer -- mark a local buffer invalid.
     594             :  *
     595             :  * If check_unreferenced is true, error out if the buffer is still
     596             :  * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
     597             :  * as part of changing the identity of a buffer, instead of just dropping the
     598             :  * buffer.
     599             :  *
     600             :  * See also InvalidateBuffer().
     601             :  */
     602             : void
     603       46014 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
     604             : {
     605       46014 :     Buffer      buffer = BufferDescriptorGetBuffer(bufHdr);
     606       46014 :     int         bufid = -buffer - 1;
     607             :     uint32      buf_state;
     608             :     LocalBufferLookupEnt *hresult;
     609             : 
     610             :     /*
     611             :      * It's possible that we started IO on this buffer before e.g. aborting
     612             :      * the transaction that created a table. We need to wait for that IO to
     613             :      * complete before removing / reusing the buffer.
     614             :      */
     615       46014 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     616             :     {
     617           0 :         PgAioWaitRef iow = bufHdr->io_wref;
     618             : 
     619           0 :         pgaio_wref_wait(&iow);
     620             :         Assert(!pgaio_wref_valid(&bufHdr->io_wref));
     621             :     }
     622             : 
     623       46014 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     624             : 
     625             :     /*
     626             :      * We need to test not just LocalRefCount[bufid] but also the BufferDesc
     627             :      * itself, as the latter is used to represent a pin by the AIO subsystem.
     628             :      * This can happen if AIO is initiated and then the query errors out.
     629             :      */
     630       46014 :     if (check_unreferenced &&
     631       33196 :         (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
     632           0 :         elog(ERROR, "block %u of %s is still referenced (local %u)",
     633             :              bufHdr->tag.blockNum,
     634             :              relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     635             :                             MyProcNumber,
     636             :                             BufTagGetForkNum(&bufHdr->tag)).str,
     637             :              LocalRefCount[bufid]);
     638             : 
     639             :     /* Remove entry from hashtable */
     640             :     hresult = (LocalBufferLookupEnt *)
     641       46014 :         hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     642       46014 :     if (!hresult)               /* shouldn't happen */
     643           0 :         elog(ERROR, "local buffer hash table corrupted");
     644             :     /* Mark buffer invalid */
     645       46014 :     ClearBufferTag(&bufHdr->tag);
     646       46014 :     buf_state &= ~BUF_FLAG_MASK;
     647       46014 :     buf_state &= ~BUF_USAGECOUNT_MASK;
     648       46014 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     649       46014 : }
     650             : 
     651             : /*
     652             :  * DropRelationLocalBuffers
     653             :  *      This function removes from the buffer pool all the pages of the
     654             :  *      specified relation that have block numbers >= firstDelBlock.
     655             :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     656             :  *      Dirty pages are simply dropped, without bothering to write them
     657             :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     658             :  *      used only with extreme caution!
     659             :  *
     660             :  *      See DropRelationBuffers in bufmgr.c for more notes.
     661             :  */
     662             : void
     663         794 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
     664             :                          BlockNumber firstDelBlock)
     665             : {
     666             :     int         i;
     667             : 
     668      664346 :     for (i = 0; i < NLocBuffer; i++)
     669             :     {
     670      663552 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     671             :         uint32      buf_state;
     672             : 
     673      663552 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     674             : 
     675      723942 :         if ((buf_state & BM_TAG_VALID) &&
     676       62232 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
     677        1842 :             BufTagGetForkNum(&bufHdr->tag) == forkNum &&
     678        1666 :             bufHdr->tag.blockNum >= firstDelBlock)
     679             :         {
     680        1602 :             InvalidateLocalBuffer(bufHdr, true);
     681             :         }
     682             :     }
     683         794 : }
     684             : 
     685             : /*
     686             :  * DropRelationAllLocalBuffers
     687             :  *      This function removes from the buffer pool all pages of all forks
     688             :  *      of the specified relation.
     689             :  *
     690             :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     691             :  */
     692             : void
     693        6194 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     694             : {
     695             :     int         i;
     696             : 
     697     5892258 :     for (i = 0; i < NLocBuffer; i++)
     698             :     {
     699     5886064 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     700             :         uint32      buf_state;
     701             : 
     702     5886064 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     703             : 
     704     6311184 :         if ((buf_state & BM_TAG_VALID) &&
     705      425120 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     706             :         {
     707       31494 :             InvalidateLocalBuffer(bufHdr, true);
     708             :         }
     709             :     }
     710        6194 : }
     711             : 
     712             : /*
     713             :  * InitLocalBuffers -
     714             :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     715             :  *    don't involve local buffers, we delay allocating actual memory for the
     716             :  *    buffers until we need them; just make the buffer headers here.
     717             :  */
     718             : static void
     719         528 : InitLocalBuffers(void)
     720             : {
     721         528 :     int         nbufs = num_temp_buffers;
     722             :     HASHCTL     info;
     723             :     int         i;
     724             : 
     725             :     /*
     726             :      * Parallel workers can't access data in temporary tables, because they
     727             :      * have no visibility into the local buffers of their leader.  This is a
     728             :      * convenient, low-cost place to provide a backstop check for that.  Note
     729             :      * that we don't wish to prevent a parallel worker from accessing catalog
     730             :      * metadata about a temp table, so checks at higher levels would be
     731             :      * inappropriate.
     732             :      */
     733         528 :     if (IsParallelWorker())
     734           0 :         ereport(ERROR,
     735             :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     736             :                  errmsg("cannot access temporary tables during a parallel operation")));
     737             : 
     738             :     /* Allocate and zero buffer headers and auxiliary arrays */
     739         528 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     740         528 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     741         528 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     742         528 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     743           0 :         ereport(FATAL,
     744             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     745             :                  errmsg("out of memory")));
     746             : 
     747         528 :     nextFreeLocalBufId = 0;
     748             : 
     749             :     /* initialize fields that need to start off nonzero */
     750      511632 :     for (i = 0; i < nbufs; i++)
     751             :     {
     752      511104 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     753             : 
     754             :         /*
     755             :          * negative to indicate local buffer. This is tricky: shared buffers
     756             :          * start with 0. We have to start with -2. (Note that the routine
     757             :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     758             :          * is -1.)
     759             :          */
     760      511104 :         buf->buf_id = -i - 2;
     761             : 
     762      511104 :         pgaio_wref_clear(&buf->io_wref);
     763             : 
     764             :         /*
     765             :          * Intentionally do not initialize the buffer's atomic variable
     766             :          * (besides zeroing the underlying memory above). That way we get
     767             :          * errors on platforms without atomics, if somebody (re-)introduces
     768             :          * atomic operations for local buffers.
     769             :          */
     770             :     }
     771             : 
     772             :     /* Create the lookup hash table */
     773         528 :     info.keysize = sizeof(BufferTag);
     774         528 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     775             : 
     776         528 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     777             :                                nbufs,
     778             :                                &info,
     779             :                                HASH_ELEM | HASH_BLOBS);
     780             : 
     781         528 :     if (!LocalBufHash)
     782           0 :         elog(ERROR, "could not initialize local buffer hash table");
     783             : 
     784             :     /* Initialization done, mark buffers allocated */
     785         528 :     NLocBuffer = nbufs;
     786         528 : }
     787             : 
     788             : /*
     789             :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     790             :  * that does not support adjusting the usagecount - but so far it does not
     791             :  * seem worth the trouble.
     792             :  *
     793             :  * Note that ResourceOwnerEnlarge() must have been done already.
     794             :  */
     795             : bool
     796     2572166 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     797             : {
     798             :     uint32      buf_state;
     799     2572166 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     800     2572166 :     int         bufid = -buffer - 1;
     801             : 
     802     2572166 :     buf_state = pg_atomic_read_u32(&buf_hdr->state);
     803             : 
     804     2572166 :     if (LocalRefCount[bufid] == 0)
     805             :     {
     806     2400184 :         NLocalPinnedBuffers++;
     807     2400184 :         buf_state += BUF_REFCOUNT_ONE;
     808     2400184 :         if (adjust_usagecount &&
     809     2353570 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     810             :         {
     811      128836 :             buf_state += BUF_USAGECOUNT_ONE;
     812             :         }
     813     2400184 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     814             : 
     815             :         /*
     816             :          * See comment in PinBuffer().
     817             :          *
     818             :          * If the buffer isn't allocated yet, it'll be marked as defined in
     819             :          * GetLocalBufferStorage().
     820             :          */
     821     2400184 :         if (LocalBufHdrGetBlock(buf_hdr) != NULL)
     822             :             VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     823             :     }
     824     2572166 :     LocalRefCount[bufid]++;
     825     2572166 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     826             :                                 BufferDescriptorGetBuffer(buf_hdr));
     827             : 
     828     2572166 :     return buf_state & BM_VALID;
     829             : }
     830             : 
     831             : void
     832     3275240 : UnpinLocalBuffer(Buffer buffer)
     833             : {
     834     3275240 :     UnpinLocalBufferNoOwner(buffer);
     835     3275240 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     836     3275240 : }
     837             : 
     838             : void
     839     3281210 : UnpinLocalBufferNoOwner(Buffer buffer)
     840             : {
     841     3281210 :     int         buffid = -buffer - 1;
     842             : 
     843             :     Assert(BufferIsLocal(buffer));
     844             :     Assert(LocalRefCount[buffid] > 0);
     845             :     Assert(NLocalPinnedBuffers > 0);
     846             : 
     847     3281210 :     if (--LocalRefCount[buffid] == 0)
     848             :     {
     849     2400184 :         BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
     850             :         uint32      buf_state;
     851             : 
     852     2400184 :         NLocalPinnedBuffers--;
     853             : 
     854     2400184 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     855             :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     856     2400184 :         buf_state -= BUF_REFCOUNT_ONE;
     857     2400184 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     858             : 
     859             :         /* see comment in UnpinBufferNoOwner */
     860             :         VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     861             :     }
     862     3281210 : }
     863             : 
     864             : /*
     865             :  * GUC check_hook for temp_buffers
     866             :  */
     867             : bool
     868        2186 : check_temp_buffers(int *newval, void **extra, GucSource source)
     869             : {
     870             :     /*
     871             :      * Once local buffers have been initialized, it's too late to change this.
     872             :      * However, if this is only a test call, allow it.
     873             :      */
     874        2186 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     875             :     {
     876           0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     877           0 :         return false;
     878             :     }
     879        2186 :     return true;
     880             : }
     881             : 
     882             : /*
     883             :  * GetLocalBufferStorage - allocate memory for a local buffer
     884             :  *
     885             :  * The idea of this function is to aggregate our requests for storage
     886             :  * so that the memory manager doesn't see a whole lot of relatively small
     887             :  * requests.  Since we'll never give back a local buffer once it's created
     888             :  * within a particular process, no point in burdening memmgr with separately
     889             :  * managed chunks.
     890             :  */
     891             : static Block
     892       31206 : GetLocalBufferStorage(void)
     893             : {
     894             :     static char *cur_block = NULL;
     895             :     static int  next_buf_in_block = 0;
     896             :     static int  num_bufs_in_block = 0;
     897             :     static int  total_bufs_allocated = 0;
     898             :     static MemoryContext LocalBufferContext = NULL;
     899             : 
     900             :     char       *this_buf;
     901             : 
     902             :     Assert(total_bufs_allocated < NLocBuffer);
     903             : 
     904       31206 :     if (next_buf_in_block >= num_bufs_in_block)
     905             :     {
     906             :         /* Need to make a new request to memmgr */
     907             :         int         num_bufs;
     908             : 
     909             :         /*
     910             :          * We allocate local buffers in a context of their own, so that the
     911             :          * space eaten for them is easily recognizable in MemoryContextStats
     912             :          * output.  Create the context on first use.
     913             :          */
     914         850 :         if (LocalBufferContext == NULL)
     915         528 :             LocalBufferContext =
     916         528 :                 AllocSetContextCreate(TopMemoryContext,
     917             :                                       "LocalBufferContext",
     918             :                                       ALLOCSET_DEFAULT_SIZES);
     919             : 
     920             :         /* Start with a 16-buffer request; subsequent ones double each time */
     921         850 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     922             :         /* But not more than what we need for all remaining local bufs */
     923         850 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     924             :         /* And don't overflow MaxAllocSize, either */
     925         850 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     926             : 
     927             :         /* Buffers should be I/O aligned. */
     928         850 :         cur_block = (char *)
     929         850 :             TYPEALIGN(PG_IO_ALIGN_SIZE,
     930             :                       MemoryContextAlloc(LocalBufferContext,
     931             :                                          num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
     932         850 :         next_buf_in_block = 0;
     933         850 :         num_bufs_in_block = num_bufs;
     934             :     }
     935             : 
     936             :     /* Allocate next buffer in current memory block */
     937       31206 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     938       31206 :     next_buf_in_block++;
     939       31206 :     total_bufs_allocated++;
     940             : 
     941             :     /*
     942             :      * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
     943             :      * here.  The block is actually undefined, but we want consistency with
     944             :      * the regular case of not needing to allocate memory.  This is
     945             :      * specifically needed when method_io_uring.c fills the block, because
     946             :      * Valgrind doesn't recognize io_uring reads causing undefined memory to
     947             :      * become defined.
     948             :      */
     949             :     VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
     950             : 
     951       31206 :     return (Block) this_buf;
     952             : }
     953             : 
     954             : /*
     955             :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     956             :  *
     957             :  * This is just like CheckForBufferLeaks(), but for local buffers.
     958             :  */
     959             : static void
     960      905394 : CheckForLocalBufferLeaks(void)
     961             : {
     962             : #ifdef USE_ASSERT_CHECKING
     963             :     if (LocalRefCount)
     964             :     {
     965             :         int         RefCountErrors = 0;
     966             :         int         i;
     967             : 
     968             :         for (i = 0; i < NLocBuffer; i++)
     969             :         {
     970             :             if (LocalRefCount[i] != 0)
     971             :             {
     972             :                 Buffer      b = -i - 1;
     973             :                 char       *s;
     974             : 
     975             :                 s = DebugPrintBufferRefcount(b);
     976             :                 elog(WARNING, "local buffer refcount leak: %s", s);
     977             :                 pfree(s);
     978             : 
     979             :                 RefCountErrors++;
     980             :             }
     981             :         }
     982             :         Assert(RefCountErrors == 0);
     983             :     }
     984             : #endif
     985      905394 : }
     986             : 
     987             : /*
     988             :  * AtEOXact_LocalBuffers - clean up at end of transaction.
     989             :  *
     990             :  * This is just like AtEOXact_Buffers, but for local buffers.
     991             :  */
     992             : void
     993      864572 : AtEOXact_LocalBuffers(bool isCommit)
     994             : {
     995      864572 :     CheckForLocalBufferLeaks();
     996      864572 : }
     997             : 
     998             : /*
     999             :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
    1000             :  *
    1001             :  * This is just like AtProcExit_Buffers, but for local buffers.
    1002             :  */
    1003             : void
    1004       40822 : AtProcExit_LocalBuffers(void)
    1005             : {
    1006             :     /*
    1007             :      * We shouldn't be holding any remaining pins; if we are, and assertions
    1008             :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
    1009             :      * drop the temp rels.
    1010             :      */
    1011       40822 :     CheckForLocalBufferLeaks();
    1012       40822 : }

Generated by: LCOV version 1.14