LCOV - code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 89.7 % 282 253
Test Date: 2026-04-07 14:16:30 Functions: 100.0 % 23 23
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * localbuf.c
       4              :  *    local buffer manager. Fast buffer manager for temporary tables,
       5              :  *    which never need to be WAL-logged or checkpointed, etc.
       6              :  *
       7              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       8              :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9              :  *
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *    src/backend/storage/buffer/localbuf.c
      13              :  *
      14              :  *-------------------------------------------------------------------------
      15              :  */
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/parallel.h"
      19              : #include "executor/instrument.h"
      20              : #include "pgstat.h"
      21              : #include "storage/aio.h"
      22              : #include "storage/buf_internals.h"
      23              : #include "storage/bufmgr.h"
      24              : #include "storage/fd.h"
      25              : #include "utils/guc_hooks.h"
      26              : #include "utils/memdebug.h"
      27              : #include "utils/memutils.h"
      28              : #include "utils/rel.h"
      29              : #include "utils/resowner.h"
      30              : 
      31              : 
      32              : /*#define LBDEBUG*/
      33              : 
      34              : /* entry for buffer lookup hashtable */
      35              : typedef struct
      36              : {
      37              :     BufferTag   key;            /* Tag of a disk page */
      38              :     int         id;             /* Associated local buffer's index */
      39              : } LocalBufferLookupEnt;
      40              : 
      41              : /* Note: this macro only works on local buffers, not shared ones! */
      42              : #define LocalBufHdrGetBlock(bufHdr) \
      43              :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      44              : 
      45              : int         NLocBuffer = 0;     /* until buffers are initialized */
      46              : 
      47              : BufferDesc *LocalBufferDescriptors = NULL;
      48              : Block      *LocalBufferBlockPointers = NULL;
      49              : int32      *LocalRefCount = NULL;
      50              : 
      51              : static int  nextFreeLocalBufId = 0;
      52              : 
      53              : static HTAB *LocalBufHash = NULL;
      54              : 
      55              : /* number of local buffers pinned at least once */
      56              : static int  NLocalPinnedBuffers = 0;
      57              : 
      58              : 
      59              : static void InitLocalBuffers(void);
      60              : static Block GetLocalBufferStorage(void);
      61              : static Buffer GetLocalVictimBuffer(void);
      62              : 
      63              : 
      64              : /*
      65              :  * PrefetchLocalBuffer -
      66              :  *    initiate asynchronous read of a block of a relation
      67              :  *
      68              :  * Do PrefetchBuffer's work for temporary relations.
      69              :  * No-op if prefetching isn't compiled in.
      70              :  */
      71              : PrefetchBufferResult
      72         1357 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      73              :                     BlockNumber blockNum)
      74              : {
      75         1357 :     PrefetchBufferResult result = {InvalidBuffer, false};
      76              :     BufferTag   newTag;         /* identity of requested block */
      77              :     LocalBufferLookupEnt *hresult;
      78              : 
      79         1357 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      80              : 
      81              :     /* Initialize local buffers if first request in this session */
      82         1357 :     if (LocalBufHash == NULL)
      83            0 :         InitLocalBuffers();
      84              : 
      85              :     /* See if the desired buffer already exists */
      86              :     hresult = (LocalBufferLookupEnt *)
      87         1357 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      88              : 
      89         1357 :     if (hresult)
      90              :     {
      91              :         /* Yes, so nothing to do */
      92         1141 :         result.recent_buffer = -hresult->id - 1;
      93              :     }
      94              :     else
      95              :     {
      96              : #ifdef USE_PREFETCH
      97              :         /* Not in buffers, so initiate prefetch */
      98          432 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      99          216 :             smgrprefetch(smgr, forkNum, blockNum, 1))
     100              :         {
     101          216 :             result.initiated_io = true;
     102              :         }
     103              : #endif                          /* USE_PREFETCH */
     104              :     }
     105              : 
     106         1357 :     return result;
     107              : }
     108              : 
     109              : 
     110              : /*
     111              :  * LocalBufferAlloc -
     112              :  *    Find or create a local buffer for the given page of the given relation.
     113              :  *
     114              :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
     115              :  * any locking since this is all local.  We support only default access
     116              :  * strategy (hence, usage_count is always advanced).
     117              :  */
     118              : BufferDesc *
     119      1648452 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     120              :                  bool *foundPtr)
     121              : {
     122              :     BufferTag   newTag;         /* identity of requested block */
     123              :     LocalBufferLookupEnt *hresult;
     124              :     BufferDesc *bufHdr;
     125              :     Buffer      victim_buffer;
     126              :     int         bufid;
     127              :     bool        found;
     128              : 
     129      1648452 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     130              : 
     131              :     /* Initialize local buffers if first request in this session */
     132      1648452 :     if (LocalBufHash == NULL)
     133           17 :         InitLocalBuffers();
     134              : 
     135      1648452 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     136              : 
     137              :     /* See if the desired buffer already exists */
     138              :     hresult = (LocalBufferLookupEnt *)
     139      1648452 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     140              : 
     141      1648452 :     if (hresult)
     142              :     {
     143      1637422 :         bufid = hresult->id;
     144      1637422 :         bufHdr = GetLocalBufferDescriptor(bufid);
     145              :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     146              : 
     147      1637422 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     148              :     }
     149              :     else
     150              :     {
     151              :         uint64      buf_state;
     152              : 
     153        11030 :         victim_buffer = GetLocalVictimBuffer();
     154        11022 :         bufid = -victim_buffer - 1;
     155        11022 :         bufHdr = GetLocalBufferDescriptor(bufid);
     156              : 
     157              :         hresult = (LocalBufferLookupEnt *)
     158        11022 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     159        11022 :         if (found)              /* shouldn't happen */
     160            0 :             elog(ERROR, "local buffer hash table corrupted");
     161        11022 :         hresult->id = bufid;
     162              : 
     163              :         /*
     164              :          * it's all ours now.
     165              :          */
     166        11022 :         bufHdr->tag = newTag;
     167              : 
     168        11022 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     169        11022 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     170        11022 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     171        11022 :         pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     172              : 
     173        11022 :         *foundPtr = false;
     174              :     }
     175              : 
     176      1648444 :     return bufHdr;
     177              : }
     178              : 
     179              : /*
     180              :  * Like FlushBuffer(), just for local buffers.
     181              :  */
     182              : void
     183         4430 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
     184              : {
     185              :     instr_time  io_start;
     186         4430 :     Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     187              : 
     188              :     Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
     189              : 
     190              :     /*
     191              :      * Try to start an I/O operation.  There currently are no reasons for
     192              :      * StartLocalBufferIO to return anything other than
     193              :      * BUFFER_IO_READY_FOR_IO, so we raise an error in that case.
     194              :      */
     195         4430 :     if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
     196            0 :         elog(ERROR, "failed to start write IO on local buffer");
     197              : 
     198              :     /* Find smgr relation for buffer */
     199         4430 :     if (reln == NULL)
     200         4038 :         reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
     201              :                         MyProcNumber);
     202              : 
     203         4430 :     PageSetChecksum(localpage, bufHdr->tag.blockNum);
     204              : 
     205         4430 :     io_start = pgstat_prepare_io_time(track_io_timing);
     206              : 
     207              :     /* And write... */
     208         4430 :     smgrwrite(reln,
     209         4430 :               BufTagGetForkNum(&bufHdr->tag),
     210              :               bufHdr->tag.blockNum,
     211              :               localpage,
     212              :               false);
     213              : 
     214              :     /* Temporary table I/O does not use Buffer Access Strategies */
     215         4430 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     216              :                             IOOP_WRITE, io_start, 1, BLCKSZ);
     217              : 
     218              :     /* Mark not-dirty */
     219         4430 :     TerminateLocalBufferIO(bufHdr, true, 0, false);
     220              : 
     221         4430 :     pgBufferUsage.local_blks_written++;
     222         4430 : }
     223              : 
     224              : static Buffer
     225        30307 : GetLocalVictimBuffer(void)
     226              : {
     227              :     int         victim_bufid;
     228              :     int         trycounter;
     229              :     BufferDesc *bufHdr;
     230              : 
     231        30307 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     232              : 
     233              :     /*
     234              :      * Need to get a new buffer.  We use a clock-sweep algorithm (essentially
     235              :      * the same as what freelist.c does now...)
     236              :      */
     237        30307 :     trycounter = NLocBuffer;
     238              :     for (;;)
     239              :     {
     240       167043 :         victim_bufid = nextFreeLocalBufId;
     241              : 
     242       167043 :         if (++nextFreeLocalBufId >= NLocBuffer)
     243         1448 :             nextFreeLocalBufId = 0;
     244              : 
     245       167043 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     246              : 
     247       167043 :         if (LocalRefCount[victim_bufid] == 0)
     248              :         {
     249        54071 :             uint64      buf_state = pg_atomic_read_u64(&bufHdr->state);
     250              : 
     251        54071 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     252              :             {
     253        23772 :                 buf_state -= BUF_USAGECOUNT_ONE;
     254        23772 :                 pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     255        23772 :                 trycounter = NLocBuffer;
     256              :             }
     257        30299 :             else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
     258              :             {
     259              :                 /*
     260              :                  * This can be reached if the backend initiated AIO for this
     261              :                  * buffer and then errored out.
     262              :                  */
     263              :             }
     264              :             else
     265              :             {
     266              :                 /* Found a usable buffer */
     267        30299 :                 PinLocalBuffer(bufHdr, false);
     268        30299 :                 break;
     269              :             }
     270              :         }
     271       112972 :         else if (--trycounter == 0)
     272            8 :             ereport(ERROR,
     273              :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     274              :                      errmsg("no empty local buffer available")));
     275              :     }
     276              : 
     277              :     /*
     278              :      * lazy memory allocation: allocate space on first use of a buffer.
     279              :      */
     280        30299 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     281              :     {
     282              :         /* Set pointer for use by BufferGetBlock() macro */
     283        20931 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     284              :     }
     285              : 
     286              :     /*
     287              :      * this buffer is not referenced but it might still be dirty. if that's
     288              :      * the case, write it out before reusing it!
     289              :      */
     290        30299 :     if (pg_atomic_read_u64(&bufHdr->state) & BM_DIRTY)
     291         3964 :         FlushLocalBuffer(bufHdr, NULL);
     292              : 
     293              :     /*
     294              :      * Remove the victim buffer from the hashtable and mark as invalid.
     295              :      */
     296        30299 :     if (pg_atomic_read_u64(&bufHdr->state) & BM_TAG_VALID)
     297              :     {
     298         8100 :         InvalidateLocalBuffer(bufHdr, false);
     299              : 
     300         8100 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
     301              :     }
     302              : 
     303        30299 :     return BufferDescriptorGetBuffer(bufHdr);
     304              : }
     305              : 
     306              : /* see GetPinLimit() */
     307              : uint32
     308         9250 : GetLocalPinLimit(void)
     309              : {
     310              :     /* Every backend has its own temporary buffers, and can pin them all. */
     311         9250 :     return num_temp_buffers;
     312              : }
     313              : 
     314              : /* see GetAdditionalPinLimit() */
     315              : uint32
     316        32754 : GetAdditionalLocalPinLimit(void)
     317              : {
     318              :     Assert(NLocalPinnedBuffers <= num_temp_buffers);
     319        32754 :     return num_temp_buffers - NLocalPinnedBuffers;
     320              : }
     321              : 
     322              : /* see LimitAdditionalPins() */
     323              : void
     324        14912 : LimitAdditionalLocalPins(uint32 *additional_pins)
     325              : {
     326              :     uint32      max_pins;
     327              : 
     328        14912 :     if (*additional_pins <= 1)
     329        14479 :         return;
     330              : 
     331              :     /*
     332              :      * In contrast to LimitAdditionalPins() other backends don't play a role
     333              :      * here. We can allow up to NLocBuffer pins in total, but it might not be
     334              :      * initialized yet so read num_temp_buffers.
     335              :      */
     336          433 :     max_pins = (num_temp_buffers - NLocalPinnedBuffers);
     337              : 
     338          433 :     if (*additional_pins >= max_pins)
     339            0 :         *additional_pins = max_pins;
     340              : }
     341              : 
     342              : /*
     343              :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     344              :  * temporary buffers.
     345              :  */
     346              : BlockNumber
     347        14912 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
     348              :                        ForkNumber fork,
     349              :                        uint32 flags,
     350              :                        uint32 extend_by,
     351              :                        BlockNumber extend_upto,
     352              :                        Buffer *buffers,
     353              :                        uint32 *extended_by)
     354              : {
     355              :     BlockNumber first_block;
     356              :     instr_time  io_start;
     357              : 
     358              :     /* Initialize local buffers if first request in this session */
     359        14912 :     if (LocalBufHash == NULL)
     360          333 :         InitLocalBuffers();
     361              : 
     362        14912 :     LimitAdditionalLocalPins(&extend_by);
     363              : 
     364        34189 :     for (uint32 i = 0; i < extend_by; i++)
     365              :     {
     366              :         BufferDesc *buf_hdr;
     367              :         Block       buf_block;
     368              : 
     369        19277 :         buffers[i] = GetLocalVictimBuffer();
     370        19277 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     371        19277 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     372              : 
     373              :         /* new buffers are zero-filled */
     374        19277 :         MemSet(buf_block, 0, BLCKSZ);
     375              :     }
     376              : 
     377        14912 :     first_block = smgrnblocks(BMR_GET_SMGR(bmr), fork);
     378              : 
     379              :     if (extend_upto != InvalidBlockNumber)
     380              :     {
     381              :         /*
     382              :          * In contrast to shared relations, nothing could change the relation
     383              :          * size concurrently. Thus we shouldn't end up finding that we don't
     384              :          * need to do anything.
     385              :          */
     386              :         Assert(first_block <= extend_upto);
     387              : 
     388              :         Assert((uint64) first_block + extend_by <= extend_upto);
     389              :     }
     390              : 
     391              :     /* Fail if relation is already at maximum possible length */
     392        14912 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     393            0 :         ereport(ERROR,
     394              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     395              :                  errmsg("cannot extend relation %s beyond %u blocks",
     396              :                         relpath(BMR_GET_SMGR(bmr)->smgr_rlocator, fork).str,
     397              :                         MaxBlockNumber)));
     398              : 
     399        34189 :     for (uint32 i = 0; i < extend_by; i++)
     400              :     {
     401              :         int         victim_buf_id;
     402              :         BufferDesc *victim_buf_hdr;
     403              :         BufferTag   tag;
     404              :         LocalBufferLookupEnt *hresult;
     405              :         bool        found;
     406              : 
     407        19277 :         victim_buf_id = -buffers[i] - 1;
     408        19277 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     409              : 
     410              :         /* in case we need to pin an existing buffer below */
     411        19277 :         ResourceOwnerEnlarge(CurrentResourceOwner);
     412              : 
     413        19277 :         InitBufferTag(&tag, &BMR_GET_SMGR(bmr)->smgr_rlocator.locator, fork,
     414              :                       first_block + i);
     415              : 
     416              :         hresult = (LocalBufferLookupEnt *)
     417        19277 :             hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
     418        19277 :         if (found)
     419              :         {
     420              :             BufferDesc *existing_hdr;
     421              :             uint64      buf_state;
     422              : 
     423            0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     424              : 
     425            0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     426            0 :             PinLocalBuffer(existing_hdr, false);
     427            0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     428              : 
     429              :             /*
     430              :              * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
     431              :              */
     432            0 :             buf_state = pg_atomic_read_u64(&existing_hdr->state);
     433              :             Assert(buf_state & BM_TAG_VALID);
     434              :             Assert(!(buf_state & BM_DIRTY));
     435            0 :             buf_state &= ~BM_VALID;
     436            0 :             pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
     437              : 
     438              :             /* no need to loop for local buffers */
     439            0 :             StartLocalBufferIO(existing_hdr, true, true, NULL);
     440              :         }
     441              :         else
     442              :         {
     443        19277 :             uint64      buf_state = pg_atomic_read_u64(&victim_buf_hdr->state);
     444              : 
     445              :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY)));
     446              : 
     447        19277 :             victim_buf_hdr->tag = tag;
     448              : 
     449        19277 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     450              : 
     451        19277 :             pg_atomic_unlocked_write_u64(&victim_buf_hdr->state, buf_state);
     452              : 
     453        19277 :             hresult->id = victim_buf_id;
     454              : 
     455        19277 :             StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
     456              :         }
     457              :     }
     458              : 
     459        14912 :     io_start = pgstat_prepare_io_time(track_io_timing);
     460              : 
     461              :     /* actually extend relation */
     462        14912 :     smgrzeroextend(BMR_GET_SMGR(bmr), fork, first_block, extend_by, false);
     463              : 
     464        14912 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     465        14912 :                             io_start, 1, extend_by * BLCKSZ);
     466              : 
     467        34189 :     for (uint32 i = 0; i < extend_by; i++)
     468              :     {
     469        19277 :         Buffer      buf = buffers[i];
     470              :         BufferDesc *buf_hdr;
     471              :         uint64      buf_state;
     472              : 
     473        19277 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     474              : 
     475        19277 :         buf_state = pg_atomic_read_u64(&buf_hdr->state);
     476        19277 :         buf_state |= BM_VALID;
     477        19277 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     478              :     }
     479              : 
     480        14912 :     *extended_by = extend_by;
     481              : 
     482        14912 :     pgBufferUsage.local_blks_written += extend_by;
     483              : 
     484        14912 :     return first_block;
     485              : }
     486              : 
     487              : /*
     488              :  * MarkLocalBufferDirty -
     489              :  *    mark a local buffer dirty
     490              :  */
     491              : void
     492      2404835 : MarkLocalBufferDirty(Buffer buffer)
     493              : {
     494              :     int         bufid;
     495              :     BufferDesc *bufHdr;
     496              :     uint64      buf_state;
     497              : 
     498              :     Assert(BufferIsLocal(buffer));
     499              : 
     500              : #ifdef LBDEBUG
     501              :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     502              : #endif
     503              : 
     504      2404835 :     bufid = -buffer - 1;
     505              : 
     506              :     Assert(LocalRefCount[bufid] > 0);
     507              : 
     508      2404835 :     bufHdr = GetLocalBufferDescriptor(bufid);
     509              : 
     510      2404835 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     511              : 
     512      2404835 :     if (!(buf_state & BM_DIRTY))
     513        19155 :         pgBufferUsage.local_blks_dirtied++;
     514              : 
     515      2404835 :     buf_state |= BM_DIRTY;
     516              : 
     517      2404835 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     518      2404835 : }
     519              : 
     520              : /*
     521              :  * Like StartSharedBufferIO, but for local buffers
     522              :  */
     523              : StartBufferIOResult
     524        34799 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
     525              : {
     526              :     uint64      buf_state;
     527              : 
     528              :     /*
     529              :      * With AIO the buffer could have IO in progress, e.g. when there are two
     530              :      * scans of the same relation.  Either wait for the other IO (if wait =
     531              :      * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
     532              :      */
     533        34799 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     534              :     {
     535            0 :         PgAioWaitRef buf_wref = bufHdr->io_wref;
     536              : 
     537            0 :         if (io_wref != NULL)
     538              :         {
     539              :             /* We've already asynchronously started this IO, so join it */
     540            0 :             *io_wref = buf_wref;
     541            0 :             return BUFFER_IO_IN_PROGRESS;
     542              :         }
     543              : 
     544              :         /*
     545              :          * For temp buffers we should never need to wait in
     546              :          * StartLocalBufferIO() when called with io_wref == NULL while there
     547              :          * are staged IOs, as it's not allowed to call code that is not aware
     548              :          * of AIO while in batch mode.
     549              :          */
     550              :         Assert(!pgaio_have_staged());
     551              : 
     552            0 :         if (!wait)
     553            0 :             return BUFFER_IO_IN_PROGRESS;
     554              : 
     555            0 :         pgaio_wref_wait(&buf_wref);
     556              :     }
     557              : 
     558              :     /* Once we get here, there is definitely no I/O active on this buffer */
     559              : 
     560              :     /* Check if someone else already did the I/O */
     561        34799 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     562        34799 :     if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
     563              :     {
     564            4 :         return BUFFER_IO_ALREADY_DONE;
     565              :     }
     566              : 
     567              :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     568              : 
     569              :     /* local buffers don't track IO using resowners */
     570              : 
     571        34795 :     return BUFFER_IO_READY_FOR_IO;
     572              : }
     573              : 
     574              : /*
     575              :  * Like TerminateBufferIO, but for local buffers
     576              :  */
     577              : void
     578        15516 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint64 set_flag_bits,
     579              :                        bool release_aio)
     580              : {
     581              :     /* Only need to adjust flags */
     582        15516 :     uint64      buf_state = pg_atomic_read_u64(&bufHdr->state);
     583              : 
     584              :     /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
     585              : 
     586              :     /* Clear earlier errors, if this IO failed, it'll be marked again */
     587        15516 :     buf_state &= ~BM_IO_ERROR;
     588              : 
     589        15516 :     if (clear_dirty)
     590         4430 :         buf_state &= ~BM_DIRTY;
     591              : 
     592        15516 :     if (release_aio)
     593              :     {
     594              :         /* release pin held by IO subsystem, see also buffer_stage_common() */
     595              :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     596        11052 :         buf_state -= BUF_REFCOUNT_ONE;
     597        11052 :         pgaio_wref_clear(&bufHdr->io_wref);
     598              :     }
     599              : 
     600        15516 :     buf_state |= set_flag_bits;
     601        15516 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     602              : 
     603              :     /* local buffers don't track IO using resowners */
     604              : 
     605              :     /* local buffers don't use the IO CV, as no other process can see buffer */
     606              : 
     607              :     /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
     608        15516 : }
     609              : 
     610              : /*
     611              :  * InvalidateLocalBuffer -- mark a local buffer invalid.
     612              :  *
     613              :  * If check_unreferenced is true, error out if the buffer is still
     614              :  * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
     615              :  * as part of changing the identity of a buffer, instead of just dropping the
     616              :  * buffer.
     617              :  *
     618              :  * See also InvalidateBuffer().
     619              :  */
     620              : void
     621        30299 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
     622              : {
     623        30299 :     Buffer      buffer = BufferDescriptorGetBuffer(bufHdr);
     624        30299 :     int         bufid = -buffer - 1;
     625              :     uint64      buf_state;
     626              :     LocalBufferLookupEnt *hresult;
     627              : 
     628              :     /*
     629              :      * It's possible that we started IO on this buffer before e.g. aborting
     630              :      * the transaction that created a table. We need to wait for that IO to
     631              :      * complete before removing / reusing the buffer.
     632              :      */
     633        30299 :     if (pgaio_wref_valid(&bufHdr->io_wref))
     634              :     {
     635            0 :         PgAioWaitRef iow = bufHdr->io_wref;
     636              : 
     637            0 :         pgaio_wref_wait(&iow);
     638              :         Assert(!pgaio_wref_valid(&bufHdr->io_wref));
     639              :     }
     640              : 
     641        30299 :     buf_state = pg_atomic_read_u64(&bufHdr->state);
     642              : 
     643              :     /*
     644              :      * We need to test not just LocalRefCount[bufid] but also the BufferDesc
     645              :      * itself, as the latter is used to represent a pin by the AIO subsystem.
     646              :      * This can happen if AIO is initiated and then the query errors out.
     647              :      */
     648        30299 :     if (check_unreferenced &&
     649        22199 :         (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
     650            0 :         elog(ERROR, "block %u of %s is still referenced (local %d)",
     651              :              bufHdr->tag.blockNum,
     652              :              relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     653              :                             MyProcNumber,
     654              :                             BufTagGetForkNum(&bufHdr->tag)).str,
     655              :              LocalRefCount[bufid]);
     656              : 
     657              :     /* Remove entry from hashtable */
     658              :     hresult = (LocalBufferLookupEnt *)
     659        30299 :         hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     660        30299 :     if (!hresult)               /* shouldn't happen */
     661            0 :         elog(ERROR, "local buffer hash table corrupted");
     662              :     /* Mark buffer invalid */
     663        30299 :     ClearBufferTag(&bufHdr->tag);
     664        30299 :     buf_state &= ~BUF_FLAG_MASK;
     665        30299 :     buf_state &= ~BUF_USAGECOUNT_MASK;
     666        30299 :     pg_atomic_unlocked_write_u64(&bufHdr->state, buf_state);
     667        30299 : }
     668              : 
     669              : /*
     670              :  * DropRelationLocalBuffers
     671              :  *      This function removes from the buffer pool all the pages of the
     672              :  *      specified relation that have block numbers >= firstDelBlock.
     673              :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     674              :  *      Dirty pages are simply dropped, without bothering to write them
     675              :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     676              :  *      used only with extreme caution!
     677              :  *
     678              :  *      See DropRelationBuffers in bufmgr.c for more notes.
     679              :  */
     680              : void
     681          498 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
     682              :                          int nforks, BlockNumber *firstDelBlock)
     683              : {
     684              :     int         i;
     685              :     int         j;
     686              : 
     687       412146 :     for (i = 0; i < NLocBuffer; i++)
     688              :     {
     689       411648 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     690              :         uint64      buf_state;
     691              : 
     692       411648 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     693              : 
     694       411648 :         if (!(buf_state & BM_TAG_VALID) ||
     695        38156 :             !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     696       410530 :             continue;
     697              : 
     698         1279 :         for (j = 0; j < nforks; j++)
     699              :         {
     700         1229 :             if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
     701         1110 :                 bufHdr->tag.blockNum >= firstDelBlock[j])
     702              :             {
     703         1068 :                 InvalidateLocalBuffer(bufHdr, true);
     704         1068 :                 break;
     705              :             }
     706              :         }
     707              :     }
     708          498 : }
     709              : 
     710              : /*
     711              :  * DropRelationAllLocalBuffers
     712              :  *      This function removes from the buffer pool all pages of all forks
     713              :  *      of the specified relation.
     714              :  *
     715              :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     716              :  */
     717              : void
     718         4374 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     719              : {
     720              :     int         i;
     721              : 
     722      4108374 :     for (i = 0; i < NLocBuffer; i++)
     723              :     {
     724      4104000 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     725              :         uint64      buf_state;
     726              : 
     727      4104000 :         buf_state = pg_atomic_read_u64(&bufHdr->state);
     728              : 
     729      4418176 :         if ((buf_state & BM_TAG_VALID) &&
     730       314176 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     731              :         {
     732        20973 :             InvalidateLocalBuffer(bufHdr, true);
     733              :         }
     734              :     }
     735         4374 : }
     736              : 
     737              : /*
     738              :  * InitLocalBuffers -
     739              :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     740              :  *    don't involve local buffers, we delay allocating actual memory for the
     741              :  *    buffers until we need them; just make the buffer headers here.
     742              :  */
     743              : static void
     744          350 : InitLocalBuffers(void)
     745              : {
     746          350 :     int         nbufs = num_temp_buffers;
     747              :     HASHCTL     info;
     748              :     int         i;
     749              : 
     750              :     /*
     751              :      * Parallel workers can't access data in temporary tables, because they
     752              :      * have no visibility into the local buffers of their leader.  This is a
     753              :      * convenient, low-cost place to provide a backstop check for that.  Note
     754              :      * that we don't wish to prevent a parallel worker from accessing catalog
     755              :      * metadata about a temp table, so checks at higher levels would be
     756              :      * inappropriate.
     757              :      */
     758          350 :     if (IsParallelWorker())
     759            0 :         ereport(ERROR,
     760              :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     761              :                  errmsg("cannot access temporary tables during a parallel operation")));
     762              : 
     763              :     /* Allocate and zero buffer headers and auxiliary arrays */
     764          350 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     765          350 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     766          350 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     767          350 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     768            0 :         ereport(FATAL,
     769              :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     770              :                  errmsg("out of memory")));
     771              : 
     772          350 :     nextFreeLocalBufId = 0;
     773              : 
     774              :     /* initialize fields that need to start off nonzero */
     775       338422 :     for (i = 0; i < nbufs; i++)
     776              :     {
     777       338072 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     778              : 
     779              :         /*
     780              :          * negative to indicate local buffer. This is tricky: shared buffers
     781              :          * start with 0. We have to start with -2. (Note that the routine
     782              :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     783              :          * is -1.)
     784              :          */
     785       338072 :         buf->buf_id = -i - 2;
     786              : 
     787       338072 :         pgaio_wref_clear(&buf->io_wref);
     788              : 
     789              :         /*
     790              :          * Intentionally do not initialize the buffer's atomic variable
     791              :          * (besides zeroing the underlying memory above). That way we get
     792              :          * errors on platforms without atomics, if somebody (re-)introduces
     793              :          * atomic operations for local buffers.
     794              :          */
     795              :     }
     796              : 
     797              :     /* Create the lookup hash table */
     798          350 :     info.keysize = sizeof(BufferTag);
     799          350 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     800              : 
     801          350 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     802              :                                nbufs,
     803              :                                &info,
     804              :                                HASH_ELEM | HASH_BLOBS);
     805              : 
     806          350 :     if (!LocalBufHash)
     807            0 :         elog(ERROR, "could not initialize local buffer hash table");
     808              : 
     809              :     /* Initialization done, mark buffers allocated */
     810          350 :     NLocBuffer = nbufs;
     811          350 : }
     812              : 
     813              : /*
     814              :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     815              :  * that does not support adjusting the usagecount - but so far it does not
     816              :  * seem worth the trouble.
     817              :  *
     818              :  * Note that ResourceOwnerEnlarge() must have been done already.
     819              :  */
     820              : bool
     821      1668253 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     822              : {
     823              :     uint64      buf_state;
     824      1668253 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     825      1668253 :     int         bufid = -buffer - 1;
     826              : 
     827      1668253 :     buf_state = pg_atomic_read_u64(&buf_hdr->state);
     828              : 
     829      1668253 :     if (LocalRefCount[bufid] == 0)
     830              :     {
     831      1557943 :         NLocalPinnedBuffers++;
     832      1557943 :         buf_state += BUF_REFCOUNT_ONE;
     833      1557943 :         if (adjust_usagecount &&
     834      1527252 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     835              :         {
     836        84474 :             buf_state += BUF_USAGECOUNT_ONE;
     837              :         }
     838      1557943 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     839              : 
     840              :         /*
     841              :          * See comment in PinBuffer().
     842              :          *
     843              :          * If the buffer isn't allocated yet, it'll be marked as defined in
     844              :          * GetLocalBufferStorage().
     845              :          */
     846      1557943 :         if (LocalBufHdrGetBlock(buf_hdr) != NULL)
     847              :             VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     848              :     }
     849      1668253 :     LocalRefCount[bufid]++;
     850      1668253 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     851              :                                 BufferDescriptorGetBuffer(buf_hdr));
     852              : 
     853      1668253 :     return buf_state & BM_VALID;
     854              : }
     855              : 
     856              : void
     857      2133947 : UnpinLocalBuffer(Buffer buffer)
     858              : {
     859      2133947 :     UnpinLocalBufferNoOwner(buffer);
     860      2133947 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     861      2133947 : }
     862              : 
     863              : void
     864      2137931 : UnpinLocalBufferNoOwner(Buffer buffer)
     865              : {
     866      2137931 :     int         buffid = -buffer - 1;
     867              : 
     868              :     Assert(BufferIsLocal(buffer));
     869              :     Assert(LocalRefCount[buffid] > 0);
     870              :     Assert(NLocalPinnedBuffers > 0);
     871              : 
     872      2137931 :     if (--LocalRefCount[buffid] == 0)
     873              :     {
     874      1557943 :         BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
     875              :         uint64      buf_state;
     876              : 
     877      1557943 :         NLocalPinnedBuffers--;
     878              : 
     879      1557943 :         buf_state = pg_atomic_read_u64(&buf_hdr->state);
     880              :         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
     881      1557943 :         buf_state -= BUF_REFCOUNT_ONE;
     882      1557943 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     883              : 
     884              :         /* see comment in UnpinBufferNoOwner */
     885              :         VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
     886              :     }
     887      2137931 : }
     888              : 
     889              : /*
     890              :  * GUC check_hook for temp_buffers
     891              :  */
     892              : bool
     893         1291 : check_temp_buffers(int *newval, void **extra, GucSource source)
     894              : {
     895              :     /*
     896              :      * Once local buffers have been initialized, it's too late to change this.
     897              :      * However, if this is only a test call, allow it.
     898              :      */
     899         1291 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     900              :     {
     901            0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     902            0 :         return false;
     903              :     }
     904         1291 :     return true;
     905              : }
     906              : 
     907              : /*
     908              :  * GetLocalBufferStorage - allocate memory for a local buffer
     909              :  *
     910              :  * The idea of this function is to aggregate our requests for storage
     911              :  * so that the memory manager doesn't see a whole lot of relatively small
     912              :  * requests.  Since we'll never give back a local buffer once it's created
     913              :  * within a particular process, no point in burdening memmgr with separately
     914              :  * managed chunks.
     915              :  */
     916              : static Block
     917        20931 : GetLocalBufferStorage(void)
     918              : {
     919              :     static char *cur_block = NULL;
     920              :     static int  next_buf_in_block = 0;
     921              :     static int  num_bufs_in_block = 0;
     922              :     static int  total_bufs_allocated = 0;
     923              :     static MemoryContext LocalBufferContext = NULL;
     924              : 
     925              :     char       *this_buf;
     926              : 
     927              :     Assert(total_bufs_allocated < NLocBuffer);
     928              : 
     929        20931 :     if (next_buf_in_block >= num_bufs_in_block)
     930              :     {
     931              :         /* Need to make a new request to memmgr */
     932              :         int         num_bufs;
     933              : 
     934              :         /*
     935              :          * We allocate local buffers in a context of their own, so that the
     936              :          * space eaten for them is easily recognizable in MemoryContextStats
     937              :          * output.  Create the context on first use.
     938              :          */
     939          567 :         if (LocalBufferContext == NULL)
     940          350 :             LocalBufferContext =
     941          350 :                 AllocSetContextCreate(TopMemoryContext,
     942              :                                       "LocalBufferContext",
     943              :                                       ALLOCSET_DEFAULT_SIZES);
     944              : 
     945              :         /* Start with a 16-buffer request; subsequent ones double each time */
     946          567 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     947              :         /* But not more than what we need for all remaining local bufs */
     948          567 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     949              :         /* And don't overflow MaxAllocSize, either */
     950          567 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     951              : 
     952              :         /* Buffers should be I/O aligned. */
     953         1134 :         cur_block = MemoryContextAllocAligned(LocalBufferContext,
     954          567 :                                               num_bufs * BLCKSZ,
     955              :                                               PG_IO_ALIGN_SIZE,
     956              :                                               0);
     957              : 
     958          567 :         next_buf_in_block = 0;
     959          567 :         num_bufs_in_block = num_bufs;
     960              :     }
     961              : 
     962              :     /* Allocate next buffer in current memory block */
     963        20931 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     964        20931 :     next_buf_in_block++;
     965        20931 :     total_bufs_allocated++;
     966              : 
     967              :     /*
     968              :      * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
     969              :      * here.  The block is actually undefined, but we want consistency with
     970              :      * the regular case of not needing to allocate memory.  This is
     971              :      * specifically needed when method_io_uring.c fills the block, because
     972              :      * Valgrind doesn't recognize io_uring reads causing undefined memory to
     973              :      * become defined.
     974              :      */
     975              :     VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
     976              : 
     977        20931 :     return (Block) this_buf;
     978              : }
     979              : 
     980              : /*
     981              :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     982              :  *
     983              :  * This is just like CheckForBufferLeaks(), but for local buffers.
     984              :  */
     985              : static void
     986       654594 : CheckForLocalBufferLeaks(void)
     987              : {
     988              : #ifdef USE_ASSERT_CHECKING
     989              :     if (LocalRefCount)
     990              :     {
     991              :         int         RefCountErrors = 0;
     992              :         int         i;
     993              : 
     994              :         for (i = 0; i < NLocBuffer; i++)
     995              :         {
     996              :             if (LocalRefCount[i] != 0)
     997              :             {
     998              :                 Buffer      b = -i - 1;
     999              :                 char       *s;
    1000              : 
    1001              :                 s = DebugPrintBufferRefcount(b);
    1002              :                 elog(WARNING, "local buffer refcount leak: %s", s);
    1003              :                 pfree(s);
    1004              : 
    1005              :                 RefCountErrors++;
    1006              :             }
    1007              :         }
    1008              :         Assert(RefCountErrors == 0);
    1009              :     }
    1010              : #endif
    1011       654594 : }
    1012              : 
    1013              : /*
    1014              :  * AtEOXact_LocalBuffers - clean up at end of transaction.
    1015              :  *
    1016              :  * This is just like AtEOXact_Buffers, but for local buffers.
    1017              :  */
    1018              : void
    1019       629830 : AtEOXact_LocalBuffers(bool isCommit)
    1020              : {
    1021       629830 :     CheckForLocalBufferLeaks();
    1022       629830 : }
    1023              : 
    1024              : /*
    1025              :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
    1026              :  *
    1027              :  * This is just like AtProcExit_Buffers, but for local buffers.
    1028              :  */
    1029              : void
    1030        24764 : AtProcExit_LocalBuffers(void)
    1031              : {
    1032              :     /*
    1033              :      * We shouldn't be holding any remaining pins; if we are, and assertions
    1034              :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
    1035              :      * drop the temp rels.
    1036              :      */
    1037        24764 :     CheckForLocalBufferLeaks();
    1038        24764 : }
        

Generated by: LCOV version 2.0-1