LCOV - code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 214 239 89.5 %
Date: 2025-01-18 03:14:54 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * localbuf.c
       4             :  *    local buffer manager. Fast buffer manager for temporary tables,
       5             :  *    which never need to be WAL-logged or checkpointed, etc.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9             :  *
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/storage/buffer/localbuf.c
      13             :  *
      14             :  *-------------------------------------------------------------------------
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/parallel.h"
      19             : #include "executor/instrument.h"
      20             : #include "pgstat.h"
      21             : #include "storage/buf_internals.h"
      22             : #include "storage/bufmgr.h"
      23             : #include "storage/fd.h"
      24             : #include "utils/guc_hooks.h"
      25             : #include "utils/memutils.h"
      26             : #include "utils/resowner.h"
      27             : 
      28             : 
      29             : /*#define LBDEBUG*/
      30             : 
      31             : /* entry for buffer lookup hashtable */
      32             : typedef struct
      33             : {
      34             :     BufferTag   key;            /* Tag of a disk page */
      35             :     int         id;             /* Associated local buffer's index */
      36             : } LocalBufferLookupEnt;
      37             : 
      38             : /* Note: this macro only works on local buffers, not shared ones! */
      39             : #define LocalBufHdrGetBlock(bufHdr) \
      40             :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      41             : 
      42             : int         NLocBuffer = 0;     /* until buffers are initialized */
      43             : 
      44             : BufferDesc *LocalBufferDescriptors = NULL;
      45             : Block      *LocalBufferBlockPointers = NULL;
      46             : int32      *LocalRefCount = NULL;
      47             : 
      48             : static int  nextFreeLocalBufId = 0;
      49             : 
      50             : static HTAB *LocalBufHash = NULL;
      51             : 
      52             : /* number of local buffers pinned at least once */
      53             : static int  NLocalPinnedBuffers = 0;
      54             : 
      55             : 
      56             : static void InitLocalBuffers(void);
      57             : static Block GetLocalBufferStorage(void);
      58             : static Buffer GetLocalVictimBuffer(void);
      59             : 
      60             : 
      61             : /*
      62             :  * PrefetchLocalBuffer -
      63             :  *    initiate asynchronous read of a block of a relation
      64             :  *
      65             :  * Do PrefetchBuffer's work for temporary relations.
      66             :  * No-op if prefetching isn't compiled in.
      67             :  */
      68             : PrefetchBufferResult
      69        6224 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      70             :                     BlockNumber blockNum)
      71             : {
      72        6224 :     PrefetchBufferResult result = {InvalidBuffer, false};
      73             :     BufferTag   newTag;         /* identity of requested block */
      74             :     LocalBufferLookupEnt *hresult;
      75             : 
      76        6224 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      77             : 
      78             :     /* Initialize local buffers if first request in this session */
      79        6224 :     if (LocalBufHash == NULL)
      80           0 :         InitLocalBuffers();
      81             : 
      82             :     /* See if the desired buffer already exists */
      83             :     hresult = (LocalBufferLookupEnt *)
      84        6224 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      85             : 
      86        6224 :     if (hresult)
      87             :     {
      88             :         /* Yes, so nothing to do */
      89        6224 :         result.recent_buffer = -hresult->id - 1;
      90             :     }
      91             :     else
      92             :     {
      93             : #ifdef USE_PREFETCH
      94             :         /* Not in buffers, so initiate prefetch */
      95           0 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      96           0 :             smgrprefetch(smgr, forkNum, blockNum, 1))
      97             :         {
      98           0 :             result.initiated_io = true;
      99             :         }
     100             : #endif                          /* USE_PREFETCH */
     101             :     }
     102             : 
     103        6224 :     return result;
     104             : }
     105             : 
     106             : 
     107             : /*
     108             :  * LocalBufferAlloc -
     109             :  *    Find or create a local buffer for the given page of the given relation.
     110             :  *
     111             :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
     112             :  * any locking since this is all local.  We support only default access
     113             :  * strategy (hence, usage_count is always advanced).
     114             :  */
     115             : BufferDesc *
     116     2124928 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     117             :                  bool *foundPtr)
     118             : {
     119             :     BufferTag   newTag;         /* identity of requested block */
     120             :     LocalBufferLookupEnt *hresult;
     121             :     BufferDesc *bufHdr;
     122             :     Buffer      victim_buffer;
     123             :     int         bufid;
     124             :     bool        found;
     125             : 
     126     2124928 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     127             : 
     128             :     /* Initialize local buffers if first request in this session */
     129     2124928 :     if (LocalBufHash == NULL)
     130          26 :         InitLocalBuffers();
     131             : 
     132     2124928 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     133             : 
     134             :     /* See if the desired buffer already exists */
     135             :     hresult = (LocalBufferLookupEnt *)
     136     2124928 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     137             : 
     138     2124928 :     if (hresult)
     139             :     {
     140     2117230 :         bufid = hresult->id;
     141     2117230 :         bufHdr = GetLocalBufferDescriptor(bufid);
     142             :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     143             : 
     144     2117230 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     145             :     }
     146             :     else
     147             :     {
     148             :         uint32      buf_state;
     149             : 
     150        7698 :         victim_buffer = GetLocalVictimBuffer();
     151        7698 :         bufid = -victim_buffer - 1;
     152        7698 :         bufHdr = GetLocalBufferDescriptor(bufid);
     153             : 
     154             :         hresult = (LocalBufferLookupEnt *)
     155        7698 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     156        7698 :         if (found)              /* shouldn't happen */
     157           0 :             elog(ERROR, "local buffer hash table corrupted");
     158        7698 :         hresult->id = bufid;
     159             : 
     160             :         /*
     161             :          * it's all ours now.
     162             :          */
     163        7698 :         bufHdr->tag = newTag;
     164             : 
     165        7698 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     166        7698 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     167        7698 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     168        7698 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     169             : 
     170        7698 :         *foundPtr = false;
     171             :     }
     172             : 
     173     2124928 :     return bufHdr;
     174             : }
     175             : 
     176             : static Buffer
     177       31872 : GetLocalVictimBuffer(void)
     178             : {
     179             :     int         victim_bufid;
     180             :     int         trycounter;
     181             :     uint32      buf_state;
     182             :     BufferDesc *bufHdr;
     183             : 
     184       31872 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     185             : 
     186             :     /*
     187             :      * Need to get a new buffer.  We use a clock sweep algorithm (essentially
     188             :      * the same as what freelist.c does now...)
     189             :      */
     190       31872 :     trycounter = NLocBuffer;
     191             :     for (;;)
     192             :     {
     193       38286 :         victim_bufid = nextFreeLocalBufId;
     194             : 
     195       38286 :         if (++nextFreeLocalBufId >= NLocBuffer)
     196          66 :             nextFreeLocalBufId = 0;
     197             : 
     198       38286 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     199             : 
     200       38286 :         if (LocalRefCount[victim_bufid] == 0)
     201             :         {
     202       38172 :             buf_state = pg_atomic_read_u32(&bufHdr->state);
     203             : 
     204       38172 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     205             :             {
     206        6300 :                 buf_state -= BUF_USAGECOUNT_ONE;
     207        6300 :                 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     208        6300 :                 trycounter = NLocBuffer;
     209             :             }
     210             :             else
     211             :             {
     212             :                 /* Found a usable buffer */
     213       31872 :                 PinLocalBuffer(bufHdr, false);
     214       31872 :                 break;
     215             :             }
     216             :         }
     217         114 :         else if (--trycounter == 0)
     218           0 :             ereport(ERROR,
     219             :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     220             :                      errmsg("no empty local buffer available")));
     221             :     }
     222             : 
     223             :     /*
     224             :      * lazy memory allocation: allocate space on first use of a buffer.
     225             :      */
     226       31872 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     227             :     {
     228             :         /* Set pointer for use by BufferGetBlock() macro */
     229       29382 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     230             :     }
     231             : 
     232             :     /*
     233             :      * this buffer is not referenced but it might still be dirty. if that's
     234             :      * the case, write it out before reusing it!
     235             :      */
     236       31872 :     if (buf_state & BM_DIRTY)
     237             :     {
     238             :         instr_time  io_start;
     239             :         SMgrRelation oreln;
     240         894 :         Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     241             : 
     242             :         /* Find smgr relation for buffer */
     243         894 :         oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyProcNumber);
     244             : 
     245         894 :         PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
     246             : 
     247         894 :         io_start = pgstat_prepare_io_time(track_io_timing);
     248             : 
     249             :         /* And write... */
     250         894 :         smgrwrite(oreln,
     251         894 :                   BufTagGetForkNum(&bufHdr->tag),
     252             :                   bufHdr->tag.blockNum,
     253             :                   localpage,
     254             :                   false);
     255             : 
     256             :         /* Temporary table I/O does not use Buffer Access Strategies */
     257         894 :         pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     258             :                                 IOOP_WRITE, io_start, 1, BLCKSZ);
     259             : 
     260             :         /* Mark not-dirty now in case we error out below */
     261         894 :         buf_state &= ~BM_DIRTY;
     262         894 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     263             : 
     264         894 :         pgBufferUsage.local_blks_written++;
     265             :     }
     266             : 
     267             :     /*
     268             :      * Remove the victim buffer from the hashtable and mark as invalid.
     269             :      */
     270       31872 :     if (buf_state & BM_TAG_VALID)
     271             :     {
     272             :         LocalBufferLookupEnt *hresult;
     273             : 
     274             :         hresult = (LocalBufferLookupEnt *)
     275         900 :             hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     276         900 :         if (!hresult)           /* shouldn't happen */
     277           0 :             elog(ERROR, "local buffer hash table corrupted");
     278             :         /* mark buffer invalid just in case hash insert fails */
     279         900 :         ClearBufferTag(&bufHdr->tag);
     280         900 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     281         900 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     282             : 
     283         900 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
     284             :     }
     285             : 
     286       31872 :     return BufferDescriptorGetBuffer(bufHdr);
     287             : }
     288             : 
     289             : /* see LimitAdditionalPins() */
     290             : void
     291       28720 : LimitAdditionalLocalPins(uint32 *additional_pins)
     292             : {
     293             :     uint32      max_pins;
     294             : 
     295       28720 :     if (*additional_pins <= 1)
     296       17072 :         return;
     297             : 
     298             :     /*
     299             :      * In contrast to LimitAdditionalPins() other backends don't play a role
     300             :      * here. We can allow up to NLocBuffer pins in total, but it might not be
     301             :      * initialized yet so read num_temp_buffers.
     302             :      */
     303       11648 :     max_pins = (num_temp_buffers - NLocalPinnedBuffers);
     304             : 
     305       11648 :     if (*additional_pins >= max_pins)
     306           0 :         *additional_pins = max_pins;
     307             : }
     308             : 
     309             : /*
     310             :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     311             :  * temporary buffers.
     312             :  */
     313             : BlockNumber
     314       17682 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
     315             :                        ForkNumber fork,
     316             :                        uint32 flags,
     317             :                        uint32 extend_by,
     318             :                        BlockNumber extend_upto,
     319             :                        Buffer *buffers,
     320             :                        uint32 *extended_by)
     321             : {
     322             :     BlockNumber first_block;
     323             :     instr_time  io_start;
     324             : 
     325             :     /* Initialize local buffers if first request in this session */
     326       17682 :     if (LocalBufHash == NULL)
     327         476 :         InitLocalBuffers();
     328             : 
     329       17682 :     LimitAdditionalLocalPins(&extend_by);
     330             : 
     331       41856 :     for (uint32 i = 0; i < extend_by; i++)
     332             :     {
     333             :         BufferDesc *buf_hdr;
     334             :         Block       buf_block;
     335             : 
     336       24174 :         buffers[i] = GetLocalVictimBuffer();
     337       24174 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     338       24174 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     339             : 
     340             :         /* new buffers are zero-filled */
     341       24174 :         MemSet((char *) buf_block, 0, BLCKSZ);
     342             :     }
     343             : 
     344       17682 :     first_block = smgrnblocks(bmr.smgr, fork);
     345             : 
     346             :     if (extend_upto != InvalidBlockNumber)
     347             :     {
     348             :         /*
     349             :          * In contrast to shared relations, nothing could change the relation
     350             :          * size concurrently. Thus we shouldn't end up finding that we don't
     351             :          * need to do anything.
     352             :          */
     353             :         Assert(first_block <= extend_upto);
     354             : 
     355             :         Assert((uint64) first_block + extend_by <= extend_upto);
     356             :     }
     357             : 
     358             :     /* Fail if relation is already at maximum possible length */
     359       17682 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     360           0 :         ereport(ERROR,
     361             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     362             :                  errmsg("cannot extend relation %s beyond %u blocks",
     363             :                         relpath(bmr.smgr->smgr_rlocator, fork),
     364             :                         MaxBlockNumber)));
     365             : 
     366       41856 :     for (uint32 i = 0; i < extend_by; i++)
     367             :     {
     368             :         int         victim_buf_id;
     369             :         BufferDesc *victim_buf_hdr;
     370             :         BufferTag   tag;
     371             :         LocalBufferLookupEnt *hresult;
     372             :         bool        found;
     373             : 
     374       24174 :         victim_buf_id = -buffers[i] - 1;
     375       24174 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     376             : 
     377             :         /* in case we need to pin an existing buffer below */
     378       24174 :         ResourceOwnerEnlarge(CurrentResourceOwner);
     379             : 
     380       24174 :         InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
     381             : 
     382             :         hresult = (LocalBufferLookupEnt *)
     383       24174 :             hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
     384       24174 :         if (found)
     385             :         {
     386             :             BufferDesc *existing_hdr;
     387             :             uint32      buf_state;
     388             : 
     389           0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     390             : 
     391           0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     392           0 :             PinLocalBuffer(existing_hdr, false);
     393           0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     394             : 
     395           0 :             buf_state = pg_atomic_read_u32(&existing_hdr->state);
     396             :             Assert(buf_state & BM_TAG_VALID);
     397             :             Assert(!(buf_state & BM_DIRTY));
     398           0 :             buf_state &= ~BM_VALID;
     399           0 :             pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
     400             :         }
     401             :         else
     402             :         {
     403       24174 :             uint32      buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
     404             : 
     405             :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
     406             : 
     407       24174 :             victim_buf_hdr->tag = tag;
     408             : 
     409       24174 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     410             : 
     411       24174 :             pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
     412             : 
     413       24174 :             hresult->id = victim_buf_id;
     414             :         }
     415             :     }
     416             : 
     417       17682 :     io_start = pgstat_prepare_io_time(track_io_timing);
     418             : 
     419             :     /* actually extend relation */
     420       17682 :     smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
     421             : 
     422       17682 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     423       17682 :                             io_start, 1, extend_by * BLCKSZ);
     424             : 
     425       41856 :     for (uint32 i = 0; i < extend_by; i++)
     426             :     {
     427       24174 :         Buffer      buf = buffers[i];
     428             :         BufferDesc *buf_hdr;
     429             :         uint32      buf_state;
     430             : 
     431       24174 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     432             : 
     433       24174 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     434       24174 :         buf_state |= BM_VALID;
     435       24174 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     436             :     }
     437             : 
     438       17682 :     *extended_by = extend_by;
     439             : 
     440       17682 :     pgBufferUsage.local_blks_written += extend_by;
     441             : 
     442       17682 :     return first_block;
     443             : }
     444             : 
     445             : /*
     446             :  * MarkLocalBufferDirty -
     447             :  *    mark a local buffer dirty
     448             :  */
     449             : void
     450     3254734 : MarkLocalBufferDirty(Buffer buffer)
     451             : {
     452             :     int         bufid;
     453             :     BufferDesc *bufHdr;
     454             :     uint32      buf_state;
     455             : 
     456             :     Assert(BufferIsLocal(buffer));
     457             : 
     458             : #ifdef LBDEBUG
     459             :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     460             : #endif
     461             : 
     462     3254734 :     bufid = -buffer - 1;
     463             : 
     464             :     Assert(LocalRefCount[bufid] > 0);
     465             : 
     466     3254734 :     bufHdr = GetLocalBufferDescriptor(bufid);
     467             : 
     468     3254734 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     469             : 
     470     3254734 :     if (!(buf_state & BM_DIRTY))
     471       22400 :         pgBufferUsage.local_blks_dirtied++;
     472             : 
     473     3254734 :     buf_state |= BM_DIRTY;
     474             : 
     475     3254734 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     476     3254734 : }
     477             : 
     478             : /*
     479             :  * DropRelationLocalBuffers
     480             :  *      This function removes from the buffer pool all the pages of the
     481             :  *      specified relation that have block numbers >= firstDelBlock.
     482             :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     483             :  *      Dirty pages are simply dropped, without bothering to write them
     484             :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     485             :  *      used only with extreme caution!
     486             :  *
     487             :  *      See DropRelationBuffers in bufmgr.c for more notes.
     488             :  */
     489             : void
     490         692 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
     491             :                          BlockNumber firstDelBlock)
     492             : {
     493             :     int         i;
     494             : 
     495      645812 :     for (i = 0; i < NLocBuffer; i++)
     496             :     {
     497      645120 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     498             :         LocalBufferLookupEnt *hresult;
     499             :         uint32      buf_state;
     500             : 
     501      645120 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     502             : 
     503      705420 :         if ((buf_state & BM_TAG_VALID) &&
     504       62070 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
     505        1770 :             BufTagGetForkNum(&bufHdr->tag) == forkNum &&
     506        1636 :             bufHdr->tag.blockNum >= firstDelBlock)
     507             :         {
     508        1584 :             if (LocalRefCount[i] != 0)
     509           0 :                 elog(ERROR, "block %u of %s is still referenced (local %u)",
     510             :                      bufHdr->tag.blockNum,
     511             :                      relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     512             :                                     MyProcNumber,
     513             :                                     BufTagGetForkNum(&bufHdr->tag)),
     514             :                      LocalRefCount[i]);
     515             : 
     516             :             /* Remove entry from hashtable */
     517             :             hresult = (LocalBufferLookupEnt *)
     518        1584 :                 hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     519        1584 :             if (!hresult)       /* shouldn't happen */
     520           0 :                 elog(ERROR, "local buffer hash table corrupted");
     521             :             /* Mark buffer invalid */
     522        1584 :             ClearBufferTag(&bufHdr->tag);
     523        1584 :             buf_state &= ~BUF_FLAG_MASK;
     524        1584 :             buf_state &= ~BUF_USAGECOUNT_MASK;
     525        1584 :             pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     526             :         }
     527             :     }
     528         692 : }
     529             : 
     530             : /*
     531             :  * DropRelationAllLocalBuffers
     532             :  *      This function removes from the buffer pool all pages of all forks
     533             :  *      of the specified relation.
     534             :  *
     535             :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     536             :  */
     537             : void
     538        6064 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     539             : {
     540             :     int         i;
     541             : 
     542     5883328 :     for (i = 0; i < NLocBuffer; i++)
     543             :     {
     544     5877264 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     545             :         LocalBufferLookupEnt *hresult;
     546             :         uint32      buf_state;
     547             : 
     548     5877264 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     549             : 
     550     6297048 :         if ((buf_state & BM_TAG_VALID) &&
     551      419784 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     552             :         {
     553       29388 :             if (LocalRefCount[i] != 0)
     554           0 :                 elog(ERROR, "block %u of %s is still referenced (local %u)",
     555             :                      bufHdr->tag.blockNum,
     556             :                      relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     557             :                                     MyProcNumber,
     558             :                                     BufTagGetForkNum(&bufHdr->tag)),
     559             :                      LocalRefCount[i]);
     560             :             /* Remove entry from hashtable */
     561             :             hresult = (LocalBufferLookupEnt *)
     562       29388 :                 hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     563       29388 :             if (!hresult)       /* shouldn't happen */
     564           0 :                 elog(ERROR, "local buffer hash table corrupted");
     565             :             /* Mark buffer invalid */
     566       29388 :             ClearBufferTag(&bufHdr->tag);
     567       29388 :             buf_state &= ~BUF_FLAG_MASK;
     568       29388 :             buf_state &= ~BUF_USAGECOUNT_MASK;
     569       29388 :             pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     570             :         }
     571             :     }
     572        6064 : }
     573             : 
     574             : /*
     575             :  * InitLocalBuffers -
     576             :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     577             :  *    don't involve local buffers, we delay allocating actual memory for the
     578             :  *    buffers until we need them; just make the buffer headers here.
     579             :  */
     580             : static void
     581         502 : InitLocalBuffers(void)
     582             : {
     583         502 :     int         nbufs = num_temp_buffers;
     584             :     HASHCTL     info;
     585             :     int         i;
     586             : 
     587             :     /*
     588             :      * Parallel workers can't access data in temporary tables, because they
     589             :      * have no visibility into the local buffers of their leader.  This is a
     590             :      * convenient, low-cost place to provide a backstop check for that.  Note
     591             :      * that we don't wish to prevent a parallel worker from accessing catalog
     592             :      * metadata about a temp table, so checks at higher levels would be
     593             :      * inappropriate.
     594             :      */
     595         502 :     if (IsParallelWorker())
     596           0 :         ereport(ERROR,
     597             :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     598             :                  errmsg("cannot access temporary tables during a parallel operation")));
     599             : 
     600             :     /* Allocate and zero buffer headers and auxiliary arrays */
     601         502 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     602         502 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     603         502 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     604         502 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     605           0 :         ereport(FATAL,
     606             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     607             :                  errmsg("out of memory")));
     608             : 
     609         502 :     nextFreeLocalBufId = 0;
     610             : 
     611             :     /* initialize fields that need to start off nonzero */
     612      509006 :     for (i = 0; i < nbufs; i++)
     613             :     {
     614      508504 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     615             : 
     616             :         /*
     617             :          * negative to indicate local buffer. This is tricky: shared buffers
     618             :          * start with 0. We have to start with -2. (Note that the routine
     619             :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     620             :          * is -1.)
     621             :          */
     622      508504 :         buf->buf_id = -i - 2;
     623             : 
     624             :         /*
     625             :          * Intentionally do not initialize the buffer's atomic variable
     626             :          * (besides zeroing the underlying memory above). That way we get
     627             :          * errors on platforms without atomics, if somebody (re-)introduces
     628             :          * atomic operations for local buffers.
     629             :          */
     630             :     }
     631             : 
     632             :     /* Create the lookup hash table */
     633         502 :     info.keysize = sizeof(BufferTag);
     634         502 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     635             : 
     636         502 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     637             :                                nbufs,
     638             :                                &info,
     639             :                                HASH_ELEM | HASH_BLOBS);
     640             : 
     641         502 :     if (!LocalBufHash)
     642           0 :         elog(ERROR, "could not initialize local buffer hash table");
     643             : 
     644             :     /* Initialization done, mark buffers allocated */
     645         502 :     NLocBuffer = nbufs;
     646         502 : }
     647             : 
     648             : /*
     649             :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     650             :  * that does not support adjusting the usagecount - but so far it does not
     651             :  * seem worth the trouble.
     652             :  *
     653             :  * Note that ResourceOwnerEnlarge() must have been done already.
     654             :  */
     655             : bool
     656     2149102 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     657             : {
     658             :     uint32      buf_state;
     659     2149102 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     660     2149102 :     int         bufid = -buffer - 1;
     661             : 
     662     2149102 :     buf_state = pg_atomic_read_u32(&buf_hdr->state);
     663             : 
     664     2149102 :     if (LocalRefCount[bufid] == 0)
     665             :     {
     666     1978958 :         NLocalPinnedBuffers++;
     667     1978958 :         if (adjust_usagecount &&
     668     1947086 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     669             :         {
     670      106356 :             buf_state += BUF_USAGECOUNT_ONE;
     671      106356 :             pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     672             :         }
     673             :     }
     674     2149102 :     LocalRefCount[bufid]++;
     675     2149102 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     676             :                                 BufferDescriptorGetBuffer(buf_hdr));
     677             : 
     678     2149102 :     return buf_state & BM_VALID;
     679             : }
     680             : 
     681             : void
     682     2846290 : UnpinLocalBuffer(Buffer buffer)
     683             : {
     684     2846290 :     UnpinLocalBufferNoOwner(buffer);
     685     2846290 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     686     2846290 : }
     687             : 
     688             : void
     689     2847056 : UnpinLocalBufferNoOwner(Buffer buffer)
     690             : {
     691     2847056 :     int         buffid = -buffer - 1;
     692             : 
     693             :     Assert(BufferIsLocal(buffer));
     694             :     Assert(LocalRefCount[buffid] > 0);
     695             :     Assert(NLocalPinnedBuffers > 0);
     696             : 
     697     2847056 :     if (--LocalRefCount[buffid] == 0)
     698     1978958 :         NLocalPinnedBuffers--;
     699     2847056 : }
     700             : 
     701             : /*
     702             :  * GUC check_hook for temp_buffers
     703             :  */
     704             : bool
     705        1990 : check_temp_buffers(int *newval, void **extra, GucSource source)
     706             : {
     707             :     /*
     708             :      * Once local buffers have been initialized, it's too late to change this.
     709             :      * However, if this is only a test call, allow it.
     710             :      */
     711        1990 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     712             :     {
     713           0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     714           0 :         return false;
     715             :     }
     716        1990 :     return true;
     717             : }
     718             : 
     719             : /*
     720             :  * GetLocalBufferStorage - allocate memory for a local buffer
     721             :  *
     722             :  * The idea of this function is to aggregate our requests for storage
     723             :  * so that the memory manager doesn't see a whole lot of relatively small
     724             :  * requests.  Since we'll never give back a local buffer once it's created
     725             :  * within a particular process, no point in burdening memmgr with separately
     726             :  * managed chunks.
     727             :  */
     728             : static Block
     729       29382 : GetLocalBufferStorage(void)
     730             : {
     731             :     static char *cur_block = NULL;
     732             :     static int  next_buf_in_block = 0;
     733             :     static int  num_bufs_in_block = 0;
     734             :     static int  total_bufs_allocated = 0;
     735             :     static MemoryContext LocalBufferContext = NULL;
     736             : 
     737             :     char       *this_buf;
     738             : 
     739             :     Assert(total_bufs_allocated < NLocBuffer);
     740             : 
     741       29382 :     if (next_buf_in_block >= num_bufs_in_block)
     742             :     {
     743             :         /* Need to make a new request to memmgr */
     744             :         int         num_bufs;
     745             : 
     746             :         /*
     747             :          * We allocate local buffers in a context of their own, so that the
     748             :          * space eaten for them is easily recognizable in MemoryContextStats
     749             :          * output.  Create the context on first use.
     750             :          */
     751         778 :         if (LocalBufferContext == NULL)
     752         502 :             LocalBufferContext =
     753         502 :                 AllocSetContextCreate(TopMemoryContext,
     754             :                                       "LocalBufferContext",
     755             :                                       ALLOCSET_DEFAULT_SIZES);
     756             : 
     757             :         /* Start with a 16-buffer request; subsequent ones double each time */
     758         778 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     759             :         /* But not more than what we need for all remaining local bufs */
     760         778 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     761             :         /* And don't overflow MaxAllocSize, either */
     762         778 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     763             : 
     764             :         /* Buffers should be I/O aligned. */
     765         778 :         cur_block = (char *)
     766         778 :             TYPEALIGN(PG_IO_ALIGN_SIZE,
     767             :                       MemoryContextAlloc(LocalBufferContext,
     768             :                                          num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
     769         778 :         next_buf_in_block = 0;
     770         778 :         num_bufs_in_block = num_bufs;
     771             :     }
     772             : 
     773             :     /* Allocate next buffer in current memory block */
     774       29382 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     775       29382 :     next_buf_in_block++;
     776       29382 :     total_bufs_allocated++;
     777             : 
     778       29382 :     return (Block) this_buf;
     779             : }
     780             : 
     781             : /*
     782             :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     783             :  *
     784             :  * This is just like CheckForBufferLeaks(), but for local buffers.
     785             :  */
     786             : static void
     787      828396 : CheckForLocalBufferLeaks(void)
     788             : {
     789             : #ifdef USE_ASSERT_CHECKING
     790             :     if (LocalRefCount)
     791             :     {
     792             :         int         RefCountErrors = 0;
     793             :         int         i;
     794             : 
     795             :         for (i = 0; i < NLocBuffer; i++)
     796             :         {
     797             :             if (LocalRefCount[i] != 0)
     798             :             {
     799             :                 Buffer      b = -i - 1;
     800             :                 char       *s;
     801             : 
     802             :                 s = DebugPrintBufferRefcount(b);
     803             :                 elog(WARNING, "local buffer refcount leak: %s", s);
     804             :                 pfree(s);
     805             : 
     806             :                 RefCountErrors++;
     807             :             }
     808             :         }
     809             :         Assert(RefCountErrors == 0);
     810             :     }
     811             : #endif
     812      828396 : }
     813             : 
     814             : /*
     815             :  * AtEOXact_LocalBuffers - clean up at end of transaction.
     816             :  *
     817             :  * This is just like AtEOXact_Buffers, but for local buffers.
     818             :  */
     819             : void
     820      791110 : AtEOXact_LocalBuffers(bool isCommit)
     821             : {
     822      791110 :     CheckForLocalBufferLeaks();
     823      791110 : }
     824             : 
     825             : /*
     826             :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
     827             :  *
     828             :  * This is just like AtProcExit_Buffers, but for local buffers.
     829             :  */
     830             : void
     831       37286 : AtProcExit_LocalBuffers(void)
     832             : {
     833             :     /*
     834             :      * We shouldn't be holding any remaining pins; if we are, and assertions
     835             :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
     836             :      * drop the temp rels.
     837             :      */
     838       37286 :     CheckForLocalBufferLeaks();
     839       37286 : }

Generated by: LCOV version 1.14