LCOV - code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 89.8 % 285 256
Test Date: 2026-05-01 20:16:42 Functions: 100.0 % 23 23
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * localbuf.c
       4              :  *    local buffer manager. Fast buffer manager for temporary tables,
       5              :  *    which never need to be WAL-logged or checkpointed, etc.
       6              :  *
       7              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       8              :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9              :  *
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *    src/backend/storage/buffer/localbuf.c
      13              :  *
      14              :  *-------------------------------------------------------------------------
      15              :  */
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/parallel.h"
      19              : #include "executor/instrument.h"
      20              : #include "pgstat.h"
      21              : #include "storage/aio.h"
      22              : #include "storage/buf_internals.h"
      23              : #include "storage/bufmgr.h"
      24              : #include "storage/fd.h"
      25              : #include "utils/guc_hooks.h"
      26              : #include "utils/memdebug.h"
      27              : #include "utils/memutils.h"
      28              : #include "utils/rel.h"
      29              : #include "utils/resowner.h"
      30              : 
      31              : 
      32              : /*#define LBDEBUG*/
      33              : 
      34              : /* entry for buffer lookup hashtable */
      35              : typedef struct
      36              : {
      37              :     BufferTag   key;            /* Tag of a disk page */
      38              :     int         id;             /* Associated local buffer's index */
      39              : } LocalBufferLookupEnt;
      40              : 
      41              : /* Note: this macro only works on local buffers, not shared ones! */
      42              : #define LocalBufHdrGetBlock(bufHdr) \
      43              :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      44              : 
      45              : int         NLocBuffer = 0;     /* until buffers are initialized */
      46              : 
      47              : BufferDesc *LocalBufferDescriptors = NULL;
      48              : Block      *LocalBufferBlockPointers = NULL;
      49              : int32      *LocalRefCount = NULL;
      50              : 
      51              : static int  nextFreeLocalBufId = 0;
      52              : 
      53              : static HTAB *LocalBufHash = NULL;
      54              : 
      55              : /* number of local buffers pinned at least once */
      56              : static int  NLocalPinnedBuffers = 0;
      57              : 
      58              : 
      59              : static void InitLocalBuffers(void);
      60              : static Block GetLocalBufferStorage(void);
      61              : static Buffer GetLocalVictimBuffer(void);
      62              : 
      63              : 
      64              : /*
      65              :  * PrefetchLocalBuffer -
      66              :  *    initiate asynchronous read of a block of a relation
      67              :  *
      68              :  * Do PrefetchBuffer's work for temporary relations.
      69              :  * No-op if prefetching isn't compiled in.
      70              :  */
      71              : PrefetchBufferResult
      72         1357 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      73              :                     BlockNumber blockNum)
      74              : {
      75         1357 :     PrefetchBufferResult result = {InvalidBuffer, false};
      76              :     BufferTag   newTag;         /* identity of requested block */
      77              :     LocalBufferLookupEnt *hresult;
      78              : 
      79         1357 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      80              : 
      81              :     /* Initialize local buffers if first request in this session */
      82         1357 :     if (LocalBufHash == NULL)
      83            0 :         InitLocalBuffers();
      84              : 
      85              :     /* See if the desired buffer already exists */
      86              :     hresult = (LocalBufferLookupEnt *)
      87         1357 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      88              : 
      89         1357 :     if (hresult)
      90              :     {
      91              :         /* Yes, so nothing to do */
      92         1141 :         result.recent_buffer = -hresult->id - 1;
      93              :     }
      94              :     else
      95              :     {
      96              : #ifdef USE_PREFETCH
      97              :         /* Not in buffers, so initiate prefetch */
      98          432 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      99          216 :             smgrprefetch(smgr, forkNum, blockNum, 1))
     100              :         {
     101          216 :             result.initiated_io = true;
     102              :         }
     103              : #endif                          /* USE_PREFETCH */
     104              :     }
     105              : 
     106         1357 :     return result;
     107              : }
     108              : 
     109              : 
     110              : /*
     111              :  * LocalBufferAlloc -
     112              :  *    Find or create a local buffer for the given page of the given relation.
     113              :  *
     114              :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
     115              :  * any locking since this is all local.  We support only default access
     116              :  * strategy (hence, usage_count is always advanced).
     117              :  */
     118              : BufferDesc *
     119      1648475 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     120              :                  bool *foundPtr)
     121              : {
     122              :     BufferTag   newTag;         /* identity of requested block */
     123              :     LocalBufferLookupEnt *hresult;
     124              :     BufferDesc *bufHdr;
     125              :     Buffer      victim_buffer;
     126              :     int         bufid;
     127              :     bool        found;
     128              : 
     129      1648475 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     130              : 
     131              :     /* Initialize local buffers if first request in this session */
     132      1648475 :     if (LocalBufHash == NULL)
     133           17 :         InitLocalBuffers();
     134              : 
     135      1648475 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     136              : 
     137              :     /* See if the desired buffer already exists */
     138              :     hresult = (LocalBufferLookupEnt *)
     139      1648475 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     140              : 
     141      1648475 :     if (hresult)
     142              :     {
     143      1637442 :         bufid = hresult->id;
     144      1637442 :         bufHdr = GetLocalBufferDescriptor(bufid);
     145              :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     146              : 
     147      1637442 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     148              :     }
     149              :     else
     150              :     {
     151              :         uint64      buf_state;
     152              : 
     153        11033 :         victim_buffer = GetLocalVictimBuffer();
     154        11025 :         bufid = -victim_buffer - 1;
     155        11025 :         bufHdr = GetLocalBufferDescriptor(bufid);
     156              : 
     157              :         hresult = (LocalBufferLookupEnt *)
     158        11025 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     159        11025 :         if (found)              /* shouldn't happen */
     160            0 :             elog(ERROR, "local buffer hash table corrupted");
     161        11025 :         hresult->id = bufid;
     162              : 
     163              :         /*
     164              :          * it's all ours now.
     165              :          */
     166        11025 :         bufHdr->tag = newTag;
     167              : 
     168        11025 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     169        11025 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     170        11025 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     171        11025 :         pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     172              : 
     173        11025 :         *foundPtr = false;
     174              :     }
     175              : 
     176      1648467 :     return bufHdr;
     177              : }
     178              : 
     179              : /*
     180              :  * Like FlushBuffer(), just for local buffers.
     181              :  */
     182              : void
     183         4430 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
     184              : {
     185              :     instr_time  io_start;
     186         4430 :     Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     187              : 
     188              :     Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
     189              : 
     190              :     /*
     191              :      * Try to start an I/O operation.  There currently are no reasons for
     192              :      * StartLocalBufferIO to return anything other than
     193              :      * BUFFER_IO_READY_FOR_IO, so we raise an error in that case.
     194              :      */
     195         4430 :     if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
     196            0 :         elog(ERROR, "failed to start write IO on local buffer");
     197              : 
     198              :     /* Find smgr relation for buffer */
     199         4430 :     if (reln == NULL)
     200         4038 :         reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
     201              :                         MyProcNumber);
     202              : 
     203         4430 :     PageSetChecksum(localpage, bufHdr->tag.blockNum);
     204              : 
     205         4430 :     io_start = pgstat_prepare_io_time(track_io_timing);
     206              : 
     207              :     /* And write... */
     208         4430 :     smgrwrite(reln,
     209         4430 :               BufTagGetForkNum(&bufHdr->tag),
     210              :               bufHdr->tag.blockNum,
     211              :               localpage,
     212              :               false);
     213              : 
     214              :     /* Temporary table I/O does not use Buffer Access Strategies */
     215         4430 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     216              :                             IOOP_WRITE, io_start, 1, BLCKSZ);
     217              : 
     218              :     /* Mark not-dirty */
     219         4430 :     TerminateLocalBufferIO(bufHdr, true, 0, false);
     220              : 
     221         4430 :     pgBufferUsage.local_blks_written++;
     222         4430 : }
     223              : 
     224              : static Buffer
     225        30327 : GetLocalVictimBuffer(void)
     226              : {
     227              :     int         victim_bufid;
     228              :     int         trycounter;
     229              :     BufferDesc *bufHdr;
     230              : 
     231        30327 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     232              : 
     233              :     /*
     234              :      * Need to get a new buffer.  We use a clock-sweep algorithm (essentially
     235              :      * the same as what freelist.c does now...)
     236              :      */
     237        30327 :     trycounter = NLocBuffer;
     238              :     for (;;)
     239              :     {
     240       167063 :         victim_bufid = nextFreeLocalBufId;
     241              : 
     242       167063 :         if (++nextFreeLocalBufId >= NLocBuffer)
     243         1448 :             nextFreeLocalBufId = 0;
     244              : 
     245       167063 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     246              : 
     247       167063 :         if (LocalRefCount[victim_bufid] == 0)
     248              :         {
     249        54091 :             uint64      buf_state = pg_atomic_read_u64(&bufHdr->state);
     250              : 
     251        54091 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     252              :             {
     253        23772 :                 buf_state -= BUF_USAGECOUNT_ONE;
     254        23772 :                 pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     255        23772 :                 trycounter = NLocBuffer;
     256              :             }
     257        30319 :             else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
     258              :             {
     259              :                 /*
     260              :                  * This can be reached if the backend initiated AIO for this
     261              :                  * buffer and then errored out.
     262              :                  */
     263              :             }
     264              :             else
     265              :             {
     266              :                 /* Found a usable buffer */
     267        30319 :                 PinLocalBuffer(bufHdr, false);
     268        30319 :                 break;
     269              :             }
     270              :         }
     271       112972 :         else if (--trycounter == 0)
     272            8 :             ereport(ERROR,
     273              :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     274              :                      errmsg("no empty local buffer available")));
     275              :     }
     276              : 
     277              :     /*
     278              :      * lazy memory allocation: allocate space on first use of a buffer.
     279              :      */
     280        30319 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     281              :     {
     282              :         /* Set pointer for use by BufferGetBlock() macro */
     283        20951 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     284              :     }
     285              : 
     286              :     /*
     287              :      * this buffer is not referenced but it might still be dirty. if that's
     288              :      * the case, write it out before reusing it!
     289              :      */
     290        30319 :     if (pg_atomic_read_u64(&bufHdr->state) & BM_DIRTY)
     291         3964 :         FlushLocalBuffer(bufHdr, NULL);
     292              : 
     293              :     /*
     294              :      * Remove the victim buffer from the hashtable and mark as invalid.
     295              :      */
     296        30319 :     if (pg_atomic_read_u64(&bufHdr->state) & BM_TAG_VALID)
     297              :     {
     298         8100 :         InvalidateLocalBuffer(bufHdr, false);
     299              : 
     300         8100 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
     301              :     }
     302              : 
     303        30319 :     return BufferDescriptorGetBuffer(bufHdr);
     304              : }
     305              : 
     306              : /* see GetPinLimit() */
     307              : uint32
     308        42041 : GetLocalPinLimit(void)
     309              : {
     310              :     /*
     311              :      * Every backend has its own temporary buffers, but we leave headroom for
     312              :      * concurrent pin-holders -- like multiple scans in the same query.
     313              :      */
     314        42041 :     return num_temp_buffers / 4;
     315              : }
     316              : 
     317              : /* see GetAdditionalPinLimit() */
     318              : uint32
     319        32758 : GetAdditionalLocalPinLimit(void)
     320              : {
     321        32758 :     uint32      total = GetLocalPinLimit();
     322              : 
     323              :     Assert(NLocalPinnedBuffers <= num_temp_buffers);
     324              : 
     325        32758 :     if (NLocalPinnedBuffers >= total)
     326         5216 :         return 0;
     327        27542 :     return total - NLocalPinnedBuffers;
     328              : }
     329              : 
     330              : /* see LimitAdditionalPins() */
     331              : void
     332        14925 : LimitAdditionalLocalPins(uint32 *additional_pins)
     333              : {
     334              :     uint32      max_pins;
     335              : 
     336        14925 :     if (*additional_pins <= 1)
     337        14490 :         return;
     338              : 
     339              :     /*
     340              :      * In contrast to LimitAdditionalPins() other backends don't play a role
     341              :      * here. We can allow up to NLocBuffer pins in total, but it might not be
     342              :      * initialized yet so read num_temp_buffers.
     343              :      */
     344          435 :     max_pins = (num_temp_buffers - NLocalPinnedBuffers);
     345              : 
     346          435 :     if (*additional_pins >= max_pins)
     347            0 :         *additional_pins = max_pins;
     348              : }
     349              : 
     350              : /*
     351              :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     352              :  * temporary buffers.
     353              :  */
     354              : BlockNumber
     355        14925 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
     356              :                        ForkNumber fork,
     357              :                        uint32 flags,
     358              :                        uint32 extend_by,
     359              :                        BlockNumber extend_upto,
     360              :                        Buffer *buffers,
     361              :                        uint32 *extended_by)
     362              : {
     363              :     BlockNumber first_block;
     364              :     instr_time  io_start;
     365              : 
     366              :     /* Initialize local buffers if first request in this session */
     367        14925 :     if (LocalBufHash == NULL)
     368          337 :         InitLocalBuffers();
     369              : 
     370        14925 :     LimitAdditionalLocalPins(&extend_by);
     371              : 
     372        34219 :     for (uint32 i = 0; i < extend_by; i++)
     373              :     {
     374              :         BufferDesc *buf_hdr;
     375              :         Block       buf_block;
     376              : 
     377        19294 :         buffers[i] = GetLocalVictimBuffer();
     378        19294 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     379        19294 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     380              : 
     381              :         /* new buffers are zero-filled */
     382        19294 :         MemSet(buf_block, 0, BLCKSZ);
     383              :     }
     384              : 
     385        14925 :     first_block = smgrnblocks(BMR_GET_SMGR(bmr), fork);
     386              : 
     387              :     if (extend_upto != InvalidBlockNumber)
     388              :     {
     389              :         /*
     390              :          * In contrast to shared relations, nothing could change the relation
     391              :          * size concurrently. Thus we shouldn't end up finding that we don't
     392              :          * need to do anything.
     393              :          */
     394              :         Assert(first_block <= extend_upto);
     395              : 
     396              :         Assert((uint64) first_block + extend_by <= extend_upto);
     397              :     }
     398              : 
     399              :     /* Fail if relation is already at maximum possible length */
     400        14925 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     401            0 :         ereport(ERROR,
     402              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     403              :                  errmsg("cannot extend relation %s beyond %u blocks",
     404              :                         relpath(BMR_GET_SMGR(bmr)->smgr_rlocator, fork).str,
     405              :                         MaxBlockNumber)));
     406              : 
     407        34219 :     for (uint32 i = 0; i < extend_by; i++)
     408              :     {
     409              :         int         victim_buf_id;
     410              :         BufferDesc *victim_buf_hdr;
     411              :         BufferTag   tag;
     412              :         LocalBufferLookupEnt *hresult;
     413              :         bool        found;
     414              : 
     415        19294 :         victim_buf_id = -buffers[i] - 1;
     416        19294 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     417              : 
     418              :         /* in case we need to pin an existing buffer below */
     419        19294 :         ResourceOwnerEnlarge(CurrentResourceOwner);
     420              : 
     421        19294 :         InitBufferTag(&tag, &BMR_GET_SMGR(bmr)->smgr_rlocator.locator, fork,
     422              :                       first_block + i);
     423              : 
     424              :         hresult = (LocalBufferLookupEnt *)
     425        19294 :             hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
     426        19294 :         if (found)
     427              :         {
     428              :             BufferDesc *existing_hdr;
     429              :             uint64      buf_state;
     430              : 
     431            0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     432              : 
     433            0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     434            0 :             PinLocalBuffer(existing_hdr, false);
     435            0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     436              : 
     437              :             /*
     438              :              * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
     439              :              */
     440            0 :             buf_state = pg_atomic_read_u64(&existing_hdr->state);
     441              :             Assert(buf_state & BM_TAG_VALID);
     442              :             Assert(!(buf_state & BM_DIRTY));
     443            0 :             buf_state &= ~BM_VALID;
     444            0 :             pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
     445              : 
     446              :             /* no need to loop for local buffers */
     447            0 :             StartLocalBufferIO(existing_hdr, true, true, NULL);
     448              :         }
     449              :         else
     450              :         {
     451        19294 :             uint64      buf_state = pg_atomic_read_u64(&victim_buf_hdr->state);
     452              : 
     453              :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY)));
     454              : 
     455        19294 :             victim_buf_hdr->tag = tag;
     456              : 
     457        19294 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     458              : 
     459        19294 :             pg_atomic_unlocked_write_u64(&victim_buf_hdr->state, buf_state);
     460              : 
     461        19294 :             hresult->id = victim_buf_id;
     462              : 
     463        19294 :             StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
     464              :         }
     465              :     }
     466              : 
     467        14925 :     io_start = pgstat_prepare_io_time(track_io_timing);
     468              : 
     469              :     /* actually extend relation */
     470        14925 :     smgrzeroextend(BMR_GET_SMGR(bmr), fork, first_block, extend_by, false);
     471              : 
     472        14925 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     473        14925 :                             io_start, 1, extend_by * BLCKSZ);
     474              : 
     475        34219 :     for (uint32 i = 0; i < extend_by; i++)
     476              :     {
     477        19294 :         Buffer      buf = buffers[i];
     478              :         BufferDesc *buf_hdr;
     479              :         uint64      buf_state;
     480              : 
     481        19294 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     482              : 
     483        19294 :         buf_state = pg_atomic_read_u64(&buf_hdr->state);
     484        19294 :         buf_state |= BM_VALID;
     485        19294 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     486              :     }
     487              : 
     488        14925 :     *extended_by = extend_by;
     489              : 
     490        14925 :     pgBufferUsage.local_blks_written += extend_by;
     491              : 
     492        14925 :     return first_block;
     493              : }
     494              : 
     495              : /*
     496              :  * MarkLocalBufferDirty -
     497              :  *    mark a local buffer dirty
     498              :  */
     499              : void
     500      2405346 : MarkLocalBufferDirty(Buffer buffer)
     501              : {
     502              :     int         bufid;
     503              :     BufferDesc *bufHdr;
     504              :     uint64      buf_state;
     505              : 
     506              :     Assert(BufferIsLocal(buffer));
     507              : 
     508              : #ifdef LBDEBUG
     509              :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     510              : #endif
     511              : 
     512      2405346 :     bufid = -buffer - 1;
     513              : 
     514              :     Assert(LocalRefCount[bufid] > 0);
     515              : 
     516      2405346 :     bufHdr = GetLocalBufferDescriptor(bufid);
     517              : 
     518      2405346 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     519              : 
     520      2405346 :     if (!(buf_state & BM_DIRTY))
     521        19168 :         pgBufferUsage.local_blks_dirtied++;
     522              : 
     523      2405346 :     buf_state |= BM_DIRTY;
     524              : 
     525      2405346 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     526      2405346 : }
     527              : 
     528              : /*
     529              :  * Like StartSharedBufferIO, but for local buffers
     530              :  */
     531              : StartBufferIOResult
     532        34819 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
     533              : {
     534              :     uint64      buf_state;
     535              : 
     536              :     /*
     537              :      * With AIO the buffer could have IO in progress, e.g. when there are two
     538              :      * scans of the same relation.  Either wait for the other IO (if wait =
     539              :      * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
     540              :      */
     541        34819 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     542              :     {
     543            0 :         PgAioWaitRef buf_wref = bufHdr->io_wref;
     544              : 
     545            0 :         if (io_wref != NULL)
     546              :         {
     547              :             /* We've already asynchronously started this IO, so join it */
     548            0 :             *io_wref = buf_wref;
     549            0 :             return BUFFER_IO_IN_PROGRESS;
     550              :         }
     551              : 
     552              :         /*
     553              :          * For temp buffers we should never need to wait in
     554              :          * StartLocalBufferIO() when called with io_wref == NULL while there
     555              :          * are staged IOs, as it's not allowed to call code that is not aware
     556              :          * of AIO while in batch mode.
     557              :          */
     558              :         Assert(!pgaio_have_staged());
     559              : 
     560            0 :         if (!wait)
     561            0 :             return BUFFER_IO_IN_PROGRESS;
     562              : 
     563            0 :         pgaio_wref_wait(&buf_wref);
     564              :     }
     565              : 
     566              :     /* Once we get here, there is definitely no I/O active on this buffer */
     567              : 
     568              :     /* Check if someone else already did the I/O */
     569        34819 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     570        34819 :     if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
     571              :     {
     572            4 :         return BUFFER_IO_ALREADY_DONE;
     573              :     }
     574              : 
     575              :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     576              : 
     577              :     /* local buffers don't track IO using resowners */
     578              : 
     579        34815 :     return BUFFER_IO_READY_FOR_IO;
     580              : }
     581              : 
     582              : /*
     583              :  * Like TerminateBufferIO, but for local buffers
     584              :  */
     585              : void
     586        15519 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint64 set_flag_bits,
     587              :                        bool release_aio)
     588              : {
     589              :     /* Only need to adjust flags */
     590        15519 :     uint64      buf_state = pg_atomic_read_u64(&bufHdr->state);
     591              : 
     592              :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     593              : 
     594              :     /* Clear earlier errors, if this IO failed, it'll be marked again */
     595        15519 :     buf_state &= ~BM_IO_ERROR;
     596              : 
     597        15519 :     if (clear_dirty)
     598         4430 :         buf_state &= ~BM_DIRTY;
     599              : 
     600        15519 :     if (release_aio)
     601              :     {
     602              :         /* release pin held by IO subsystem, see also buffer_stage_common() */
     603              :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     604        11055 :         buf_state -= BUF_REFCOUNT_ONE;
     605        11055 :         pgaio_wref_clear(&bufHdr->io_wref);
     606              :     }
     607              : 
     608        15519 :     buf_state |= set_flag_bits;
     609        15519 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     610              : 
     611              :     /* local buffers don't track IO using resowners */
     612              : 
     613              :     /* local buffers don't use the IO CV, as no other process can see buffer */
     614              : 
     615              :     /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
     616        15519 : }
     617              : 
     618              : /*
     619              :  * InvalidateLocalBuffer -- mark a local buffer invalid.
     620              :  *
     621              :  * If check_unreferenced is true, error out if the buffer is still
     622              :  * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
     623              :  * as part of changing the identity of a buffer, instead of just dropping the
     624              :  * buffer.
     625              :  *
     626              :  * See also InvalidateBuffer().
     627              :  */
     628              : void
     629        30319 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
     630              : {
     631        30319 :     Buffer      buffer = BufferDescriptorGetBuffer(bufHdr);
     632        30319 :     int         bufid = -buffer - 1;
     633              :     uint64      buf_state;
     634              :     LocalBufferLookupEnt *hresult;
     635              : 
     636              :     /*
     637              :      * It's possible that we started IO on this buffer before e.g. aborting
     638              :      * the transaction that created a table. We need to wait for that IO to
     639              :      * complete before removing / reusing the buffer.
     640              :      */
     641        30319 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     642              :     {
     643            0 :         PgAioWaitRef iow = bufHdr->io_wref;
     644              : 
     645            0 :         pgaio_wref_wait(&iow);
     646              :         Assert(!pgaio_wref_valid(&bufHdr->io_wref));
     647              :     }
     648              : 
     649        30319 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     650              : 
     651              :     /*
     652              :      * We need to test not just LocalRefCount[bufid] but also the BufferDesc
     653              :      * itself, as the latter is used to represent a pin by the AIO subsystem.
     654              :      * This can happen if AIO is initiated and then the query errors out.
     655              :      */
     656        30319 :     if (check_unreferenced &&
     657        22219 :         (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
     658            0 :         elog(ERROR, "block %u of %s is still referenced (local %d)",
     659              :              bufHdr->tag.blockNum,
     660              :              relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     661              :                             MyProcNumber,
     662              :                             BufTagGetForkNum(&bufHdr->tag)).str,
     663              :              LocalRefCount[bufid]);
     664              : 
     665              :     /* Remove entry from hashtable */
     666              :     hresult = (LocalBufferLookupEnt *)
     667        30319 :         hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     668        30319 :     if (!hresult)               /* shouldn't happen */
     669            0 :         elog(ERROR, "local buffer hash table corrupted");
     670              :     /* Mark buffer invalid */
     671        30319 :     ClearBufferTag(&bufHdr->tag);
     672        30319 :     buf_state &= ~BUF_FLAG_MASK;
     673        30319 :     buf_state &= ~BUF_USAGECOUNT_MASK;
     674        30319 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     675        30319 : }
     676              : 
     677              : /*
     678              :  * DropRelationLocalBuffers
     679              :  *      This function removes from the buffer pool all the pages of the
     680              :  *      specified relation that have block numbers >= firstDelBlock.
     681              :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     682              :  *      Dirty pages are simply dropped, without bothering to write them
     683              :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     684              :  *      used only with extreme caution!
     685              :  *
     686              :  *      See DropRelationBuffers in bufmgr.c for more notes.
     687              :  */
     688              : void
     689          498 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
     690              :                          int nforks, BlockNumber *firstDelBlock)
     691              : {
     692              :     int         i;
     693              :     int         j;
     694              : 
     695       412146 :     for (i = 0; i < NLocBuffer; i++)
     696              :     {
     697       411648 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     698              :         uint64      buf_state;
     699              : 
     700       411648 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     701              : 
     702       411648 :         if (!(buf_state & BM_TAG_VALID) ||
     703        38156 :             !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     704       410530 :             continue;
     705              : 
     706         1279 :         for (j = 0; j < nforks; j++)
     707              :         {
     708         1229 :             if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
     709         1110 :                 bufHdr->tag.blockNum >= firstDelBlock[j])
     710              :             {
     711         1068 :                 InvalidateLocalBuffer(bufHdr, true);
     712         1068 :                 break;
     713              :             }
     714              :         }
     715              :     }
     716          498 : }
     717              : 
     718              : /*
     719              :  * DropRelationAllLocalBuffers
     720              :  *      This function removes from the buffer pool all pages of all forks
     721              :  *      of the specified relation.
     722              :  *
     723              :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     724              :  */
     725              : void
     726         4403 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     727              : {
     728              :     int         i;
     729              : 
     730      4138099 :     for (i = 0; i < NLocBuffer; i++)
     731              :     {
     732      4133696 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     733              :         uint64      buf_state;
     734              : 
     735      4133696 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     736              : 
     737      4448060 :         if ((buf_state & BM_TAG_VALID) &&
     738       314364 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     739              :         {
     740        20993 :             InvalidateLocalBuffer(bufHdr, true);
     741              :         }
     742              :     }
     743         4403 : }
     744              : 
     745              : /*
     746              :  * InitLocalBuffers -
     747              :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     748              :  *    don't involve local buffers, we delay allocating actual memory for the
     749              :  *    buffers until we need them; just make the buffer headers here.
     750              :  */
     751              : static void
     752          354 : InitLocalBuffers(void)
     753              : {
     754          354 :     int         nbufs = num_temp_buffers;
     755              :     HASHCTL     info;
     756              :     int         i;
     757              : 
     758              :     /*
     759              :      * Parallel workers can't access data in temporary tables, because they
     760              :      * have no visibility into the local buffers of their leader.  This is a
     761              :      * convenient, low-cost place to provide a backstop check for that.  Note
     762              :      * that we don't wish to prevent a parallel worker from accessing catalog
     763              :      * metadata about a temp table, so checks at higher levels would be
     764              :      * inappropriate.
     765              :      */
     766          354 :     if (IsParallelWorker())
     767            0 :         ereport(ERROR,
     768              :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     769              :                  errmsg("cannot access temporary tables during a parallel operation")));
     770              : 
     771              :     /* Allocate and zero buffer headers and auxiliary arrays */
     772          354 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     773          354 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     774          354 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     775          354 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     776            0 :         ereport(FATAL,
     777              :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     778              :                  errmsg("out of memory")));
     779              : 
     780          354 :     nextFreeLocalBufId = 0;
     781              : 
     782              :     /* initialize fields that need to start off nonzero */
     783       342522 :     for (i = 0; i < nbufs; i++)
     784              :     {
     785       342168 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     786              : 
     787              :         /*
     788              :          * negative to indicate local buffer. This is tricky: shared buffers
     789              :          * start with 0. We have to start with -2. (Note that the routine
     790              :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     791              :          * is -1.)
     792              :          */
     793       342168 :         buf->buf_id = -i - 2;
     794              : 
     795       342168 :         pgaio_wref_clear(&buf->io_wref);
     796              : 
     797              :         /*
     798              :          * Intentionally do not initialize the buffer's atomic variable
     799              :          * (besides zeroing the underlying memory above). That way we get
     800              :          * errors on platforms without atomics, if somebody (re-)introduces
     801              :          * atomic operations for local buffers.
     802              :          */
     803              :     }
     804              : 
     805              :     /* Create the lookup hash table */
     806          354 :     info.keysize = sizeof(BufferTag);
     807          354 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     808              : 
     809          354 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     810              :                                nbufs,
     811              :                                &info,
     812              :                                HASH_ELEM | HASH_BLOBS);
     813              : 
     814          354 :     if (!LocalBufHash)
     815            0 :         elog(ERROR, "could not initialize local buffer hash table");
     816              : 
     817              :     /* Initialization done, mark buffers allocated */
     818          354 :     NLocBuffer = nbufs;
     819          354 : }
     820              : 
     821              : /*
     822              :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     823              :  * that does not support adjusting the usagecount - but so far it does not
     824              :  * seem worth the trouble.
     825              :  *
     826              :  * Note that ResourceOwnerEnlarge() must have been done already.
     827              :  */
     828              : bool
     829      1668293 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     830              : {
     831              :     uint64      buf_state;
     832      1668293 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     833      1668293 :     int         bufid = -buffer - 1;
     834              : 
     835      1668293 :     buf_state = pg_atomic_read_u64(&buf_hdr->state);
     836              : 
     837      1668293 :     if (LocalRefCount[bufid] == 0)
     838              :     {
     839      1557963 :         NLocalPinnedBuffers++;
     840      1557963 :         buf_state += BUF_REFCOUNT_ONE;
     841      1557963 :         if (adjust_usagecount &&
     842      1527252 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     843              :         {
     844        84549 :             buf_state += BUF_USAGECOUNT_ONE;
     845              :         }
     846      1557963 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     847              : 
     848              :         /*
     849              :          * See comment in PinBuffer().
     850              :          *
     851              :          * If the buffer isn't allocated yet, it'll be marked as defined in
     852              :          * GetLocalBufferStorage().
     853              :          */
     854      1557963 :         if (LocalBufHdrGetBlock(buf_hdr) != NULL)
     855              :             VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     856              :     }
     857      1668293 :     LocalRefCount[bufid]++;
     858      1668293 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     859              :                                 BufferDescriptorGetBuffer(buf_hdr));
     860              : 
     861      1668293 :     return buf_state & BM_VALID;
     862              : }
     863              : 
     864              : void
     865      2134032 : UnpinLocalBuffer(Buffer buffer)
     866              : {
     867      2134032 :     UnpinLocalBufferNoOwner(buffer);
     868      2134032 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     869      2134032 : }
     870              : 
     871              : void
     872      2138024 : UnpinLocalBufferNoOwner(Buffer buffer)
     873              : {
     874      2138024 :     int         buffid = -buffer - 1;
     875              : 
     876              :     Assert(BufferIsLocal(buffer));
     877              :     Assert(LocalRefCount[buffid] > 0);
     878              :     Assert(NLocalPinnedBuffers > 0);
     879              : 
     880      2138024 :     if (--LocalRefCount[buffid] == 0)
     881              :     {
     882      1557963 :         BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
     883              :         uint64      buf_state;
     884              : 
     885      1557963 :         NLocalPinnedBuffers--;
     886              : 
     887      1557963 :         buf_state = pg_atomic_read_u64(&buf_hdr->state);
     888              :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     889      1557963 :         buf_state -= BUF_REFCOUNT_ONE;
     890      1557963 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     891              : 
     892              :         /* see comment in UnpinBufferNoOwner */
     893              :         VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     894              :     }
     895      2138024 : }
     896              : 
     897              : /*
     898              :  * GUC check_hook for temp_buffers
     899              :  */
     900              : bool
     901         1296 : check_temp_buffers(int *newval, void **extra, GucSource source)
     902              : {
     903              :     /*
     904              :      * Once local buffers have been initialized, it's too late to change this.
     905              :      * However, if this is only a test call, allow it.
     906              :      */
     907         1296 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     908              :     {
     909            0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     910            0 :         return false;
     911              :     }
     912         1296 :     return true;
     913              : }
     914              : 
     915              : /*
     916              :  * GetLocalBufferStorage - allocate memory for a local buffer
     917              :  *
     918              :  * The idea of this function is to aggregate our requests for storage
     919              :  * so that the memory manager doesn't see a whole lot of relatively small
     920              :  * requests.  Since we'll never give back a local buffer once it's created
     921              :  * within a particular process, no point in burdening memmgr with separately
     922              :  * managed chunks.
     923              :  */
     924              : static Block
     925        20951 : GetLocalBufferStorage(void)
     926              : {
     927              :     static char *cur_block = NULL;
     928              :     static int  next_buf_in_block = 0;
     929              :     static int  num_bufs_in_block = 0;
     930              :     static int  total_bufs_allocated = 0;
     931              :     static MemoryContext LocalBufferContext = NULL;
     932              : 
     933              :     char       *this_buf;
     934              : 
     935              :     Assert(total_bufs_allocated < NLocBuffer);
     936              : 
     937        20951 :     if (next_buf_in_block >= num_bufs_in_block)
     938              :     {
     939              :         /* Need to make a new request to memmgr */
     940              :         int         num_bufs;
     941              : 
     942              :         /*
     943              :          * We allocate local buffers in a context of their own, so that the
     944              :          * space eaten for them is easily recognizable in MemoryContextStats
     945              :          * output.  Create the context on first use.
     946              :          */
     947          571 :         if (LocalBufferContext == NULL)
     948          354 :             LocalBufferContext =
     949          354 :                 AllocSetContextCreate(TopMemoryContext,
     950              :                                       "LocalBufferContext",
     951              :                                       ALLOCSET_DEFAULT_SIZES);
     952              : 
     953              :         /* Start with a 16-buffer request; subsequent ones double each time */
     954          571 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     955              :         /* But not more than what we need for all remaining local bufs */
     956          571 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     957              :         /* And don't overflow MaxAllocSize, either */
     958          571 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     959              : 
     960              :         /* Buffers should be I/O aligned. */
     961         1142 :         cur_block = MemoryContextAllocAligned(LocalBufferContext,
     962          571 :                                               num_bufs * BLCKSZ,
     963              :                                               PG_IO_ALIGN_SIZE,
     964              :                                               0);
     965              : 
     966          571 :         next_buf_in_block = 0;
     967          571 :         num_bufs_in_block = num_bufs;
     968              :     }
     969              : 
     970              :     /* Allocate next buffer in current memory block */
     971        20951 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     972        20951 :     next_buf_in_block++;
     973        20951 :     total_bufs_allocated++;
     974              : 
     975              :     /*
     976              :      * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
     977              :      * here.  The block is actually undefined, but we want consistency with
     978              :      * the regular case of not needing to allocate memory.  This is
     979              :      * specifically needed when method_io_uring.c fills the block, because
     980              :      * Valgrind doesn't recognize io_uring reads causing undefined memory to
     981              :      * become defined.
     982              :      */
     983              :     VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
     984              : 
     985        20951 :     return (Block) this_buf;
     986              : }
     987              : 
     988              : /*
     989              :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     990              :  *
     991              :  * This is just like CheckForBufferLeaks(), but for local buffers.
     992              :  */
     993              : static void
     994       653328 : CheckForLocalBufferLeaks(void)
     995              : {
     996              : #ifdef USE_ASSERT_CHECKING
     997              :     if (LocalRefCount)
     998              :     {
     999              :         int         RefCountErrors = 0;
    1000              :         int         i;
    1001              : 
    1002              :         for (i = 0; i < NLocBuffer; i++)
    1003              :         {
    1004              :             if (LocalRefCount[i] != 0)
    1005              :             {
    1006              :                 Buffer      b = -i - 1;
    1007              :                 char       *s;
    1008              : 
    1009              :                 s = DebugPrintBufferRefcount(b);
    1010              :                 elog(WARNING, "local buffer refcount leak: %s", s);
    1011              :                 pfree(s);
    1012              : 
    1013              :                 RefCountErrors++;
    1014              :             }
    1015              :         }
    1016              :         Assert(RefCountErrors == 0);
    1017              :     }
    1018              : #endif
    1019       653328 : }
    1020              : 
    1021              : /*
    1022              :  * AtEOXact_LocalBuffers - clean up at end of transaction.
    1023              :  *
    1024              :  * This is just like AtEOXact_Buffers, but for local buffers.
    1025              :  */
    1026              : void
    1027       629078 : AtEOXact_LocalBuffers(bool isCommit)
    1028              : {
    1029       629078 :     CheckForLocalBufferLeaks();
    1030       629078 : }
    1031              : 
    1032              : /*
    1033              :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
    1034              :  *
    1035              :  * This is just like AtProcExit_Buffers, but for local buffers.
    1036              :  */
    1037              : void
    1038        24250 : AtProcExit_LocalBuffers(void)
    1039              : {
    1040              :     /*
    1041              :      * We shouldn't be holding any remaining pins; if we are, and assertions
    1042              :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
    1043              :      * drop the temp rels.
    1044              :      */
    1045        24250 :     CheckForLocalBufferLeaks();
    1046        24250 : }
        

Generated by: LCOV version 2.0-1