LCOV - code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 245 275 89.1 %
Date: 2025-04-01 14:15:22 Functions: 23 23 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * localbuf.c
       4             :  *    local buffer manager. Fast buffer manager for temporary tables,
       5             :  *    which never need to be WAL-logged or checkpointed, etc.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9             :  *
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/storage/buffer/localbuf.c
      13             :  *
      14             :  *-------------------------------------------------------------------------
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/parallel.h"
      19             : #include "executor/instrument.h"
      20             : #include "pgstat.h"
      21             : #include "storage/aio.h"
      22             : #include "storage/buf_internals.h"
      23             : #include "storage/bufmgr.h"
      24             : #include "storage/fd.h"
      25             : #include "utils/guc_hooks.h"
      26             : #include "utils/memutils.h"
      27             : #include "utils/resowner.h"
      28             : 
      29             : 
      30             : /*#define LBDEBUG*/
      31             : 
      32             : /* entry for buffer lookup hashtable */
      33             : typedef struct
      34             : {
      35             :     BufferTag   key;            /* Tag of a disk page */
      36             :     int         id;             /* Associated local buffer's index */
      37             : } LocalBufferLookupEnt;
      38             : 
      39             : /* Note: this macro only works on local buffers, not shared ones! */
      40             : #define LocalBufHdrGetBlock(bufHdr) \
      41             :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      42             : 
      43             : int         NLocBuffer = 0;     /* until buffers are initialized */
      44             : 
      45             : BufferDesc *LocalBufferDescriptors = NULL;
      46             : Block      *LocalBufferBlockPointers = NULL;
      47             : int32      *LocalRefCount = NULL;
      48             : 
      49             : static int  nextFreeLocalBufId = 0;
      50             : 
      51             : static HTAB *LocalBufHash = NULL;
      52             : 
      53             : /* number of local buffers pinned at least once */
      54             : static int  NLocalPinnedBuffers = 0;
      55             : 
      56             : 
      57             : static void InitLocalBuffers(void);
      58             : static Block GetLocalBufferStorage(void);
      59             : static Buffer GetLocalVictimBuffer(void);
      60             : static void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
      61             : 
      62             : 
      63             : /*
      64             :  * PrefetchLocalBuffer -
      65             :  *    initiate asynchronous read of a block of a relation
      66             :  *
      67             :  * Do PrefetchBuffer's work for temporary relations.
      68             :  * No-op if prefetching isn't compiled in.
      69             :  */
      70             : PrefetchBufferResult
      71        1502 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      72             :                     BlockNumber blockNum)
      73             : {
      74        1502 :     PrefetchBufferResult result = {InvalidBuffer, false};
      75             :     BufferTag   newTag;         /* identity of requested block */
      76             :     LocalBufferLookupEnt *hresult;
      77             : 
      78        1502 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      79             : 
      80             :     /* Initialize local buffers if first request in this session */
      81        1502 :     if (LocalBufHash == NULL)
      82           0 :         InitLocalBuffers();
      83             : 
      84             :     /* See if the desired buffer already exists */
      85             :     hresult = (LocalBufferLookupEnt *)
      86        1502 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      87             : 
      88        1502 :     if (hresult)
      89             :     {
      90             :         /* Yes, so nothing to do */
      91        1502 :         result.recent_buffer = -hresult->id - 1;
      92             :     }
      93             :     else
      94             :     {
      95             : #ifdef USE_PREFETCH
      96             :         /* Not in buffers, so initiate prefetch */
      97           0 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      98           0 :             smgrprefetch(smgr, forkNum, blockNum, 1))
      99             :         {
     100           0 :             result.initiated_io = true;
     101             :         }
     102             : #endif                          /* USE_PREFETCH */
     103             :     }
     104             : 
     105        1502 :     return result;
     106             : }
     107             : 
     108             : 
     109             : /*
     110             :  * LocalBufferAlloc -
     111             :  *    Find or create a local buffer for the given page of the given relation.
     112             :  *
     113             :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
     114             :  * any locking since this is all local.  We support only default access
     115             :  * strategy (hence, usage_count is always advanced).
     116             :  */
     117             : BufferDesc *
     118     2319138 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     119             :                  bool *foundPtr)
     120             : {
     121             :     BufferTag   newTag;         /* identity of requested block */
     122             :     LocalBufferLookupEnt *hresult;
     123             :     BufferDesc *bufHdr;
     124             :     Buffer      victim_buffer;
     125             :     int         bufid;
     126             :     bool        found;
     127             : 
     128     2319138 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     129             : 
     130             :     /* Initialize local buffers if first request in this session */
     131     2319138 :     if (LocalBufHash == NULL)
     132          26 :         InitLocalBuffers();
     133             : 
     134     2319138 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     135             : 
     136             :     /* See if the desired buffer already exists */
     137             :     hresult = (LocalBufferLookupEnt *)
     138     2319138 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     139             : 
     140     2319138 :     if (hresult)
     141             :     {
     142     2302512 :         bufid = hresult->id;
     143     2302512 :         bufHdr = GetLocalBufferDescriptor(bufid);
     144             :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     145             : 
     146     2302512 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     147             :     }
     148             :     else
     149             :     {
     150             :         uint32      buf_state;
     151             : 
     152       16626 :         victim_buffer = GetLocalVictimBuffer();
     153       16614 :         bufid = -victim_buffer - 1;
     154       16614 :         bufHdr = GetLocalBufferDescriptor(bufid);
     155             : 
     156             :         hresult = (LocalBufferLookupEnt *)
     157       16614 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     158       16614 :         if (found)              /* shouldn't happen */
     159           0 :             elog(ERROR, "local buffer hash table corrupted");
     160       16614 :         hresult->id = bufid;
     161             : 
     162             :         /*
     163             :          * it's all ours now.
     164             :          */
     165       16614 :         bufHdr->tag = newTag;
     166             : 
     167       16614 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     168       16614 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     169       16614 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     170       16614 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     171             : 
     172       16614 :         *foundPtr = false;
     173             :     }
     174             : 
     175     2319126 :     return bufHdr;
     176             : }
     177             : 
     178             : /*
     179             :  * Like FlushBuffer(), just for local buffers.
     180             :  */
     181             : void
     182        5640 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
     183             : {
     184             :     instr_time  io_start;
     185        5640 :     Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     186             : 
     187             :     /*
     188             :      * Try to start an I/O operation.  There currently are no reasons for
     189             :      * StartLocalBufferIO to return false, so we raise an error in that case.
     190             :      */
     191        5640 :     if (!StartLocalBufferIO(bufHdr, false, false))
     192           0 :         elog(ERROR, "failed to start write IO on local buffer");
     193             : 
     194             :     /* Find smgr relation for buffer */
     195        5640 :     if (reln == NULL)
     196        5040 :         reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
     197             :                         MyProcNumber);
     198             : 
     199        5640 :     PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
     200             : 
     201        5640 :     io_start = pgstat_prepare_io_time(track_io_timing);
     202             : 
     203             :     /* And write... */
     204        5640 :     smgrwrite(reln,
     205        5640 :               BufTagGetForkNum(&bufHdr->tag),
     206             :               bufHdr->tag.blockNum,
     207             :               localpage,
     208             :               false);
     209             : 
     210             :     /* Temporary table I/O does not use Buffer Access Strategies */
     211        5640 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     212             :                             IOOP_WRITE, io_start, 1, BLCKSZ);
     213             : 
     214             :     /* Mark not-dirty */
     215        5640 :     TerminateLocalBufferIO(bufHdr, true, 0, false);
     216             : 
     217        5640 :     pgBufferUsage.local_blks_written++;
     218        5640 : }
     219             : 
     220             : static Buffer
     221       42842 : GetLocalVictimBuffer(void)
     222             : {
     223             :     int         victim_bufid;
     224             :     int         trycounter;
     225             :     BufferDesc *bufHdr;
     226             : 
     227       42842 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     228             : 
     229             :     /*
     230             :      * Need to get a new buffer.  We use a clock sweep algorithm (essentially
     231             :      * the same as what freelist.c does now...)
     232             :      */
     233       42842 :     trycounter = NLocBuffer;
     234             :     for (;;)
     235             :     {
     236      190868 :         victim_bufid = nextFreeLocalBufId;
     237             : 
     238      190868 :         if (++nextFreeLocalBufId >= NLocBuffer)
     239        1590 :             nextFreeLocalBufId = 0;
     240             : 
     241      190868 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     242             : 
     243      190868 :         if (LocalRefCount[victim_bufid] == 0)
     244             :         {
     245       69536 :             uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
     246             : 
     247       69536 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     248             :             {
     249       26706 :                 buf_state -= BUF_USAGECOUNT_ONE;
     250       26706 :                 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     251       26706 :                 trycounter = NLocBuffer;
     252             :             }
     253       42830 :             else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
     254             :             {
     255             :                 /*
     256             :                  * This can be reached if the backend initiated AIO for this
     257             :                  * buffer and then errored out.
     258             :                  */
     259             :             }
     260             :             else
     261             :             {
     262             :                 /* Found a usable buffer */
     263       42830 :                 PinLocalBuffer(bufHdr, false);
     264       42830 :                 break;
     265             :             }
     266             :         }
     267      121332 :         else if (--trycounter == 0)
     268          12 :             ereport(ERROR,
     269             :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     270             :                      errmsg("no empty local buffer available")));
     271             :     }
     272             : 
     273             :     /*
     274             :      * lazy memory allocation: allocate space on first use of a buffer.
     275             :      */
     276       42830 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     277             :     {
     278             :         /* Set pointer for use by BufferGetBlock() macro */
     279       30014 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     280             :     }
     281             : 
     282             :     /*
     283             :      * this buffer is not referenced but it might still be dirty. if that's
     284             :      * the case, write it out before reusing it!
     285             :      */
     286       42830 :     if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
     287        5040 :         FlushLocalBuffer(bufHdr, NULL);
     288             : 
     289             :     /*
     290             :      * Remove the victim buffer from the hashtable and mark as invalid.
     291             :      */
     292       42830 :     if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
     293             :     {
     294       11226 :         InvalidateLocalBuffer(bufHdr, false);
     295             : 
     296       11226 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
     297             :     }
     298             : 
     299       42830 :     return BufferDescriptorGetBuffer(bufHdr);
     300             : }
     301             : 
     302             : /* see GetPinLimit() */
     303             : uint32
     304       13368 : GetLocalPinLimit(void)
     305             : {
     306             :     /* Every backend has its own temporary buffers, and can pin them all. */
     307       13368 :     return num_temp_buffers;
     308             : }
     309             : 
     310             : /* see GetAdditionalPinLimit() */
     311             : uint32
     312       47100 : GetAdditionalLocalPinLimit(void)
     313             : {
     314             :     Assert(NLocalPinnedBuffers <= num_temp_buffers);
     315       47100 :     return num_temp_buffers - NLocalPinnedBuffers;
     316             : }
     317             : 
     318             : /* see LimitAdditionalPins() */
     319             : void
     320       19700 : LimitAdditionalLocalPins(uint32 *additional_pins)
     321             : {
     322             :     uint32      max_pins;
     323             : 
     324       19700 :     if (*additional_pins <= 1)
     325       19078 :         return;
     326             : 
     327             :     /*
     328             :      * In contrast to LimitAdditionalPins() other backends don't play a role
     329             :      * here. We can allow up to NLocBuffer pins in total, but it might not be
     330             :      * initialized yet so read num_temp_buffers.
     331             :      */
     332         622 :     max_pins = (num_temp_buffers - NLocalPinnedBuffers);
     333             : 
     334         622 :     if (*additional_pins >= max_pins)
     335           0 :         *additional_pins = max_pins;
     336             : }
     337             : 
     338             : /*
     339             :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     340             :  * temporary buffers.
     341             :  */
     342             : BlockNumber
     343       19700 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
     344             :                        ForkNumber fork,
     345             :                        uint32 flags,
     346             :                        uint32 extend_by,
     347             :                        BlockNumber extend_upto,
     348             :                        Buffer *buffers,
     349             :                        uint32 *extended_by)
     350             : {
     351             :     BlockNumber first_block;
     352             :     instr_time  io_start;
     353             : 
     354             :     /* Initialize local buffers if first request in this session */
     355       19700 :     if (LocalBufHash == NULL)
     356         482 :         InitLocalBuffers();
     357             : 
     358       19700 :     LimitAdditionalLocalPins(&extend_by);
     359             : 
     360       45916 :     for (uint32 i = 0; i < extend_by; i++)
     361             :     {
     362             :         BufferDesc *buf_hdr;
     363             :         Block       buf_block;
     364             : 
     365       26216 :         buffers[i] = GetLocalVictimBuffer();
     366       26216 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     367       26216 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     368             : 
     369             :         /* new buffers are zero-filled */
     370       26216 :         MemSet(buf_block, 0, BLCKSZ);
     371             :     }
     372             : 
     373       19700 :     first_block = smgrnblocks(bmr.smgr, fork);
     374             : 
     375             :     if (extend_upto != InvalidBlockNumber)
     376             :     {
     377             :         /*
     378             :          * In contrast to shared relations, nothing could change the relation
     379             :          * size concurrently. Thus we shouldn't end up finding that we don't
     380             :          * need to do anything.
     381             :          */
     382             :         Assert(first_block <= extend_upto);
     383             : 
     384             :         Assert((uint64) first_block + extend_by <= extend_upto);
     385             :     }
     386             : 
     387             :     /* Fail if relation is already at maximum possible length */
     388       19700 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     389           0 :         ereport(ERROR,
     390             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     391             :                  errmsg("cannot extend relation %s beyond %u blocks",
     392             :                         relpath(bmr.smgr->smgr_rlocator, fork).str,
     393             :                         MaxBlockNumber)));
     394             : 
     395       45916 :     for (uint32 i = 0; i < extend_by; i++)
     396             :     {
     397             :         int         victim_buf_id;
     398             :         BufferDesc *victim_buf_hdr;
     399             :         BufferTag   tag;
     400             :         LocalBufferLookupEnt *hresult;
     401             :         bool        found;
     402             : 
     403       26216 :         victim_buf_id = -buffers[i] - 1;
     404       26216 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     405             : 
     406             :         /* in case we need to pin an existing buffer below */
     407       26216 :         ResourceOwnerEnlarge(CurrentResourceOwner);
     408             : 
     409       26216 :         InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
     410             : 
     411             :         hresult = (LocalBufferLookupEnt *)
     412       26216 :             hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
     413       26216 :         if (found)
     414             :         {
     415             :             BufferDesc *existing_hdr;
     416             :             uint32      buf_state;
     417             : 
     418           0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     419             : 
     420           0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     421           0 :             PinLocalBuffer(existing_hdr, false);
     422           0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     423             : 
     424             :             /*
     425             :              * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
     426             :              */
     427           0 :             buf_state = pg_atomic_read_u32(&existing_hdr->state);
     428             :             Assert(buf_state & BM_TAG_VALID);
     429             :             Assert(!(buf_state & BM_DIRTY));
     430           0 :             buf_state &= ~BM_VALID;
     431           0 :             pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
     432             : 
     433             :             /* no need to loop for local buffers */
     434           0 :             StartLocalBufferIO(existing_hdr, true, false);
     435             :         }
     436             :         else
     437             :         {
     438       26216 :             uint32      buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
     439             : 
     440             :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
     441             : 
     442       26216 :             victim_buf_hdr->tag = tag;
     443             : 
     444       26216 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     445             : 
     446       26216 :             pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
     447             : 
     448       26216 :             hresult->id = victim_buf_id;
     449             : 
     450       26216 :             StartLocalBufferIO(victim_buf_hdr, true, false);
     451             :         }
     452             :     }
     453             : 
     454       19700 :     io_start = pgstat_prepare_io_time(track_io_timing);
     455             : 
     456             :     /* actually extend relation */
     457       19700 :     smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
     458             : 
     459       19700 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     460       19700 :                             io_start, 1, extend_by * BLCKSZ);
     461             : 
     462       45916 :     for (uint32 i = 0; i < extend_by; i++)
     463             :     {
     464       26216 :         Buffer      buf = buffers[i];
     465             :         BufferDesc *buf_hdr;
     466             :         uint32      buf_state;
     467             : 
     468       26216 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     469             : 
     470       26216 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     471       26216 :         buf_state |= BM_VALID;
     472       26216 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     473             :     }
     474             : 
     475       19700 :     *extended_by = extend_by;
     476             : 
     477       19700 :     pgBufferUsage.local_blks_written += extend_by;
     478             : 
     479       19700 :     return first_block;
     480             : }
     481             : 
     482             : /*
     483             :  * MarkLocalBufferDirty -
     484             :  *    mark a local buffer dirty
     485             :  */
     486             : void
     487     3439154 : MarkLocalBufferDirty(Buffer buffer)
     488             : {
     489             :     int         bufid;
     490             :     BufferDesc *bufHdr;
     491             :     uint32      buf_state;
     492             : 
     493             :     Assert(BufferIsLocal(buffer));
     494             : 
     495             : #ifdef LBDEBUG
     496             :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     497             : #endif
     498             : 
     499     3439154 :     bufid = -buffer - 1;
     500             : 
     501             :     Assert(LocalRefCount[bufid] > 0);
     502             : 
     503     3439154 :     bufHdr = GetLocalBufferDescriptor(bufid);
     504             : 
     505     3439154 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     506             : 
     507     3439154 :     if (!(buf_state & BM_DIRTY))
     508       26584 :         pgBufferUsage.local_blks_dirtied++;
     509             : 
     510     3439154 :     buf_state |= BM_DIRTY;
     511             : 
     512     3439154 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     513     3439154 : }
     514             : 
     515             : /*
     516             :  * Like StartBufferIO, but for local buffers
     517             :  */
     518             : bool
     519       48470 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
     520             : {
     521             :     uint32      buf_state;
     522             : 
     523             :     /*
     524             :      * With AIO the buffer could have IO in progress, e.g. when there are two
     525             :      * scans of the same relation. Either wait for the other IO or return
     526             :      * false.
     527             :      */
     528       48470 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     529             :     {
     530           0 :         PgAioWaitRef iow = bufHdr->io_wref;
     531             : 
     532           0 :         if (nowait)
     533           0 :             return false;
     534             : 
     535           0 :         pgaio_wref_wait(&iow);
     536             :     }
     537             : 
     538             :     /* Once we get here, there is definitely no I/O active on this buffer */
     539             : 
     540             :     /* Check if someone else already did the I/O */
     541       48470 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     542       48470 :     if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
     543             :     {
     544           0 :         return false;
     545             :     }
     546             : 
     547             :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     548             : 
     549             :     /* local buffers don't track IO using resowners */
     550             : 
     551       48470 :     return true;
     552             : }
     553             : 
     554             : /*
     555             :  * Like TerminateBufferIO, but for local buffers
     556             :  */
     557             : void
     558       22254 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
     559             :                        bool release_aio)
     560             : {
     561             :     /* Only need to adjust flags */
     562       22254 :     uint32      buf_state = pg_atomic_read_u32(&bufHdr->state);
     563             : 
     564             :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     565             : 
     566             :     /* Clear earlier errors, if this IO failed, it'll be marked again */
     567       22254 :     buf_state &= ~BM_IO_ERROR;
     568             : 
     569       22254 :     if (clear_dirty)
     570        5640 :         buf_state &= ~BM_DIRTY;
     571             : 
     572       22254 :     if (release_aio)
     573             :     {
     574             :         /* release pin held by IO subsystem, see also buffer_stage_common() */
     575             :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     576       16614 :         buf_state -= BUF_REFCOUNT_ONE;
     577       16614 :         pgaio_wref_clear(&bufHdr->io_wref);
     578             :     }
     579             : 
     580       22254 :     buf_state |= set_flag_bits;
     581       22254 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     582             : 
     583             :     /* local buffers don't track IO using resowners */
     584             : 
     585             :     /* local buffers don't use the IO CV, as no other process can see buffer */
     586             : 
     587             :     /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
     588       22254 : }
     589             : 
     590             : /*
     591             :  * InvalidateLocalBuffer -- mark a local buffer invalid.
     592             :  *
     593             :  * If check_unreferenced is true, error out if the buffer is still
     594             :  * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
     595             :  * as part of changing the identity of a buffer, instead of just dropping the
     596             :  * buffer.
     597             :  *
     598             :  * See also InvalidateBuffer().
     599             :  */
     600             : static void
     601       42830 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
     602             : {
     603       42830 :     Buffer      buffer = BufferDescriptorGetBuffer(bufHdr);
     604       42830 :     int         bufid = -buffer - 1;
     605             :     uint32      buf_state;
     606             :     LocalBufferLookupEnt *hresult;
     607             : 
     608             :     /*
     609             :      * It's possible that we started IO on this buffer before e.g. aborting
     610             :      * the transaction that created a table. We need to wait for that IO to
     611             :      * complete before removing / reusing the buffer.
     612             :      */
     613       42830 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     614             :     {
     615           0 :         PgAioWaitRef iow = bufHdr->io_wref;
     616             : 
     617           0 :         pgaio_wref_wait(&iow);
     618             :         Assert(!pgaio_wref_valid(&bufHdr->io_wref));
     619             :     }
     620             : 
     621       42830 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     622             : 
     623             :     /*
     624             :      * We need to test not just LocalRefCount[bufid] but also the BufferDesc
     625             :      * itself, as the latter is used to represent a pin by the AIO subsystem.
     626             :      * This can happen if AIO is initiated and then the query errors out.
     627             :      */
     628       42830 :     if (check_unreferenced &&
     629       31604 :         (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
     630           0 :         elog(ERROR, "block %u of %s is still referenced (local %u)",
     631             :              bufHdr->tag.blockNum,
     632             :              relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     633             :                             MyProcNumber,
     634             :                             BufTagGetForkNum(&bufHdr->tag)).str,
     635             :              LocalRefCount[bufid]);
     636             : 
     637             :     /* Remove entry from hashtable */
     638             :     hresult = (LocalBufferLookupEnt *)
     639       42830 :         hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     640       42830 :     if (!hresult)               /* shouldn't happen */
     641           0 :         elog(ERROR, "local buffer hash table corrupted");
     642             :     /* Mark buffer invalid */
     643       42830 :     ClearBufferTag(&bufHdr->tag);
     644       42830 :     buf_state &= ~BUF_FLAG_MASK;
     645       42830 :     buf_state &= ~BUF_USAGECOUNT_MASK;
     646       42830 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     647       42830 : }
     648             : 
     649             : /*
     650             :  * DropRelationLocalBuffers
     651             :  *      This function removes from the buffer pool all the pages of the
     652             :  *      specified relation that have block numbers >= firstDelBlock.
     653             :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     654             :  *      Dirty pages are simply dropped, without bothering to write them
     655             :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     656             :  *      used only with extreme caution!
     657             :  *
     658             :  *      See DropRelationBuffers in bufmgr.c for more notes.
     659             :  */
     660             : void
     661         710 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
     662             :                          BlockNumber firstDelBlock)
     663             : {
     664             :     int         i;
     665             : 
     666      664262 :     for (i = 0; i < NLocBuffer; i++)
     667             :     {
     668      663552 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     669             :         uint32      buf_state;
     670             : 
     671      663552 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     672             : 
     673      723942 :         if ((buf_state & BM_TAG_VALID) &&
     674       62232 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
     675        1842 :             BufTagGetForkNum(&bufHdr->tag) == forkNum &&
     676        1666 :             bufHdr->tag.blockNum >= firstDelBlock)
     677             :         {
     678        1602 :             InvalidateLocalBuffer(bufHdr, true);
     679             :         }
     680             :     }
     681         710 : }
     682             : 
     683             : /*
     684             :  * DropRelationAllLocalBuffers
     685             :  *      This function removes from the buffer pool all pages of all forks
     686             :  *      of the specified relation.
     687             :  *
     688             :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     689             :  */
     690             : void
     691        6128 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     692             : {
     693             :     int         i;
     694             : 
     695     5888192 :     for (i = 0; i < NLocBuffer; i++)
     696             :     {
     697     5882064 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     698             :         uint32      buf_state;
     699             : 
     700     5882064 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     701             : 
     702     6304864 :         if ((buf_state & BM_TAG_VALID) &&
     703      422800 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     704             :         {
     705       30002 :             InvalidateLocalBuffer(bufHdr, true);
     706             :         }
     707             :     }
     708        6128 : }
     709             : 
     710             : /*
     711             :  * InitLocalBuffers -
     712             :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     713             :  *    don't involve local buffers, we delay allocating actual memory for the
     714             :  *    buffers until we need them; just make the buffer headers here.
     715             :  */
     716             : static void
     717         508 : InitLocalBuffers(void)
     718             : {
     719         508 :     int         nbufs = num_temp_buffers;
     720             :     HASHCTL     info;
     721             :     int         i;
     722             : 
     723             :     /*
     724             :      * Parallel workers can't access data in temporary tables, because they
     725             :      * have no visibility into the local buffers of their leader.  This is a
     726             :      * convenient, low-cost place to provide a backstop check for that.  Note
     727             :      * that we don't wish to prevent a parallel worker from accessing catalog
     728             :      * metadata about a temp table, so checks at higher levels would be
     729             :      * inappropriate.
     730             :      */
     731         508 :     if (IsParallelWorker())
     732           0 :         ereport(ERROR,
     733             :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     734             :                  errmsg("cannot access temporary tables during a parallel operation")));
     735             : 
     736             :     /* Allocate and zero buffer headers and auxiliary arrays */
     737         508 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     738         508 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     739         508 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     740         508 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     741           0 :         ereport(FATAL,
     742             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     743             :                  errmsg("out of memory")));
     744             : 
     745         508 :     nextFreeLocalBufId = 0;
     746             : 
     747             :     /* initialize fields that need to start off nonzero */
     748      509612 :     for (i = 0; i < nbufs; i++)
     749             :     {
     750      509104 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     751             : 
     752             :         /*
     753             :          * negative to indicate local buffer. This is tricky: shared buffers
     754             :          * start with 0. We have to start with -2. (Note that the routine
     755             :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     756             :          * is -1.)
     757             :          */
     758      509104 :         buf->buf_id = -i - 2;
     759             : 
     760      509104 :         pgaio_wref_clear(&buf->io_wref);
     761             : 
     762             :         /*
     763             :          * Intentionally do not initialize the buffer's atomic variable
     764             :          * (besides zeroing the underlying memory above). That way we get
     765             :          * errors on platforms without atomics, if somebody (re-)introduces
     766             :          * atomic operations for local buffers.
     767             :          */
     768             :     }
     769             : 
     770             :     /* Create the lookup hash table */
     771         508 :     info.keysize = sizeof(BufferTag);
     772         508 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     773             : 
     774         508 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     775             :                                nbufs,
     776             :                                &info,
     777             :                                HASH_ELEM | HASH_BLOBS);
     778             : 
     779         508 :     if (!LocalBufHash)
     780           0 :         elog(ERROR, "could not initialize local buffer hash table");
     781             : 
     782             :     /* Initialization done, mark buffers allocated */
     783         508 :     NLocBuffer = nbufs;
     784         508 : }
     785             : 
     786             : /*
     787             :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     788             :  * that does not support adjusting the usagecount - but so far it does not
     789             :  * seem worth the trouble.
     790             :  *
     791             :  * Note that ResourceOwnerEnlarge() must have been done already.
     792             :  */
     793             : bool
     794     2345342 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     795             : {
     796             :     uint32      buf_state;
     797     2345342 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     798     2345342 :     int         bufid = -buffer - 1;
     799             : 
     800     2345342 :     buf_state = pg_atomic_read_u32(&buf_hdr->state);
     801             : 
     802     2345342 :     if (LocalRefCount[bufid] == 0)
     803             :     {
     804     2173396 :         NLocalPinnedBuffers++;
     805     2173396 :         buf_state += BUF_REFCOUNT_ONE;
     806     2173396 :         if (adjust_usagecount &&
     807     2130566 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     808             :         {
     809      116390 :             buf_state += BUF_USAGECOUNT_ONE;
     810             :         }
     811     2173396 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     812             :     }
     813     2345342 :     LocalRefCount[bufid]++;
     814     2345342 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     815             :                                 BufferDescriptorGetBuffer(buf_hdr));
     816             : 
     817     2345342 :     return buf_state & BM_VALID;
     818             : }
     819             : 
     820             : void
     821     3048254 : UnpinLocalBuffer(Buffer buffer)
     822             : {
     823     3048254 :     UnpinLocalBufferNoOwner(buffer);
     824     3048254 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     825     3048254 : }
     826             : 
     827             : void
     828     3054204 : UnpinLocalBufferNoOwner(Buffer buffer)
     829             : {
     830     3054204 :     int         buffid = -buffer - 1;
     831             : 
     832             :     Assert(BufferIsLocal(buffer));
     833             :     Assert(LocalRefCount[buffid] > 0);
     834             :     Assert(NLocalPinnedBuffers > 0);
     835             : 
     836     3054204 :     if (--LocalRefCount[buffid] == 0)
     837             :     {
     838     2173396 :         BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
     839             :         uint32      buf_state;
     840             : 
     841     2173396 :         NLocalPinnedBuffers--;
     842             : 
     843     2173396 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     844             :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     845     2173396 :         buf_state -= BUF_REFCOUNT_ONE;
     846     2173396 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     847             :     }
     848     3054204 : }
     849             : 
     850             : /*
     851             :  * GUC check_hook for temp_buffers
     852             :  */
     853             : bool
     854        2112 : check_temp_buffers(int *newval, void **extra, GucSource source)
     855             : {
     856             :     /*
     857             :      * Once local buffers have been initialized, it's too late to change this.
     858             :      * However, if this is only a test call, allow it.
     859             :      */
     860        2112 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     861             :     {
     862           0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     863           0 :         return false;
     864             :     }
     865        2112 :     return true;
     866             : }
     867             : 
     868             : /*
     869             :  * GetLocalBufferStorage - allocate memory for a local buffer
     870             :  *
     871             :  * The idea of this function is to aggregate our requests for storage
     872             :  * so that the memory manager doesn't see a whole lot of relatively small
     873             :  * requests.  Since we'll never give back a local buffer once it's created
     874             :  * within a particular process, no point in burdening memmgr with separately
     875             :  * managed chunks.
     876             :  */
     877             : static Block
     878       30014 : GetLocalBufferStorage(void)
     879             : {
     880             :     static char *cur_block = NULL;
     881             :     static int  next_buf_in_block = 0;
     882             :     static int  num_bufs_in_block = 0;
     883             :     static int  total_bufs_allocated = 0;
     884             :     static MemoryContext LocalBufferContext = NULL;
     885             : 
     886             :     char       *this_buf;
     887             : 
     888             :     Assert(total_bufs_allocated < NLocBuffer);
     889             : 
     890       30014 :     if (next_buf_in_block >= num_bufs_in_block)
     891             :     {
     892             :         /* Need to make a new request to memmgr */
     893             :         int         num_bufs;
     894             : 
     895             :         /*
     896             :          * We allocate local buffers in a context of their own, so that the
     897             :          * space eaten for them is easily recognizable in MemoryContextStats
     898             :          * output.  Create the context on first use.
     899             :          */
     900         798 :         if (LocalBufferContext == NULL)
     901         508 :             LocalBufferContext =
     902         508 :                 AllocSetContextCreate(TopMemoryContext,
     903             :                                       "LocalBufferContext",
     904             :                                       ALLOCSET_DEFAULT_SIZES);
     905             : 
     906             :         /* Start with a 16-buffer request; subsequent ones double each time */
     907         798 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     908             :         /* But not more than what we need for all remaining local bufs */
     909         798 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     910             :         /* And don't overflow MaxAllocSize, either */
     911         798 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     912             : 
     913             :         /* Buffers should be I/O aligned. */
     914         798 :         cur_block = (char *)
     915         798 :             TYPEALIGN(PG_IO_ALIGN_SIZE,
     916             :                       MemoryContextAlloc(LocalBufferContext,
     917             :                                          num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
     918         798 :         next_buf_in_block = 0;
     919         798 :         num_bufs_in_block = num_bufs;
     920             :     }
     921             : 
     922             :     /* Allocate next buffer in current memory block */
     923       30014 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     924       30014 :     next_buf_in_block++;
     925       30014 :     total_bufs_allocated++;
     926             : 
     927       30014 :     return (Block) this_buf;
     928             : }
     929             : 
     930             : /*
     931             :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     932             :  *
     933             :  * This is just like CheckForBufferLeaks(), but for local buffers.
     934             :  */
     935             : static void
     936      861480 : CheckForLocalBufferLeaks(void)
     937             : {
     938             : #ifdef USE_ASSERT_CHECKING
     939             :     if (LocalRefCount)
     940             :     {
     941             :         int         RefCountErrors = 0;
     942             :         int         i;
     943             : 
     944             :         for (i = 0; i < NLocBuffer; i++)
     945             :         {
     946             :             if (LocalRefCount[i] != 0)
     947             :             {
     948             :                 Buffer      b = -i - 1;
     949             :                 char       *s;
     950             : 
     951             :                 s = DebugPrintBufferRefcount(b);
     952             :                 elog(WARNING, "local buffer refcount leak: %s", s);
     953             :                 pfree(s);
     954             : 
     955             :                 RefCountErrors++;
     956             :             }
     957             :         }
     958             :         Assert(RefCountErrors == 0);
     959             :     }
     960             : #endif
     961      861480 : }
     962             : 
     963             : /*
     964             :  * AtEOXact_LocalBuffers - clean up at end of transaction.
     965             :  *
     966             :  * This is just like AtEOXact_Buffers, but for local buffers.
     967             :  */
     968             : void
     969      819202 : AtEOXact_LocalBuffers(bool isCommit)
     970             : {
     971      819202 :     CheckForLocalBufferLeaks();
     972      819202 : }
     973             : 
     974             : /*
     975             :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
     976             :  *
     977             :  * This is just like AtProcExit_Buffers, but for local buffers.
     978             :  */
     979             : void
     980       42278 : AtProcExit_LocalBuffers(void)
     981             : {
     982             :     /*
     983             :      * We shouldn't be holding any remaining pins; if we are, and assertions
     984             :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
     985             :      * drop the temp rels.
     986             :      */
     987       42278 :     CheckForLocalBufferLeaks();
     988       42278 : }

Generated by: LCOV version 1.14