LCOV - code coverage report
Current view: top level - src/backend/storage/buffer - localbuf.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 213 238 89.5 %
Date: 2024-11-21 08:14:44 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * localbuf.c
       4             :  *    local buffer manager. Fast buffer manager for temporary tables,
       5             :  *    which never need to be WAL-logged or checkpointed, etc.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994-5, Regents of the University of California
       9             :  *
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/storage/buffer/localbuf.c
      13             :  *
      14             :  *-------------------------------------------------------------------------
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/parallel.h"
      19             : #include "executor/instrument.h"
      20             : #include "pgstat.h"
      21             : #include "storage/buf_internals.h"
      22             : #include "storage/bufmgr.h"
      23             : #include "storage/fd.h"
      24             : #include "utils/guc_hooks.h"
      25             : #include "utils/memutils.h"
      26             : #include "utils/resowner.h"
      27             : 
      28             : 
      29             : /*#define LBDEBUG*/
      30             : 
      31             : /* entry for buffer lookup hashtable */
      32             : typedef struct
      33             : {
      34             :     BufferTag   key;            /* Tag of a disk page */
      35             :     int         id;             /* Associated local buffer's index */
      36             : } LocalBufferLookupEnt;
      37             : 
      38             : /* Note: this macro only works on local buffers, not shared ones! */
      39             : #define LocalBufHdrGetBlock(bufHdr) \
      40             :     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
      41             : 
      42             : int         NLocBuffer = 0;     /* until buffers are initialized */
      43             : 
      44             : BufferDesc *LocalBufferDescriptors = NULL;
      45             : Block      *LocalBufferBlockPointers = NULL;
      46             : int32      *LocalRefCount = NULL;
      47             : 
      48             : static int  nextFreeLocalBufId = 0;
      49             : 
      50             : static HTAB *LocalBufHash = NULL;
      51             : 
      52             : /* number of local buffers pinned at least once */
      53             : static int  NLocalPinnedBuffers = 0;
      54             : 
      55             : 
      56             : static void InitLocalBuffers(void);
      57             : static Block GetLocalBufferStorage(void);
      58             : static Buffer GetLocalVictimBuffer(void);
      59             : 
      60             : 
      61             : /*
      62             :  * PrefetchLocalBuffer -
      63             :  *    initiate asynchronous read of a block of a relation
      64             :  *
      65             :  * Do PrefetchBuffer's work for temporary relations.
      66             :  * No-op if prefetching isn't compiled in.
      67             :  */
      68             : PrefetchBufferResult
      69        6224 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
      70             :                     BlockNumber blockNum)
      71             : {
      72        6224 :     PrefetchBufferResult result = {InvalidBuffer, false};
      73             :     BufferTag   newTag;         /* identity of requested block */
      74             :     LocalBufferLookupEnt *hresult;
      75             : 
      76        6224 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
      77             : 
      78             :     /* Initialize local buffers if first request in this session */
      79        6224 :     if (LocalBufHash == NULL)
      80           0 :         InitLocalBuffers();
      81             : 
      82             :     /* See if the desired buffer already exists */
      83             :     hresult = (LocalBufferLookupEnt *)
      84        6224 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
      85             : 
      86        6224 :     if (hresult)
      87             :     {
      88             :         /* Yes, so nothing to do */
      89        6224 :         result.recent_buffer = -hresult->id - 1;
      90             :     }
      91             :     else
      92             :     {
      93             : #ifdef USE_PREFETCH
      94             :         /* Not in buffers, so initiate prefetch */
      95           0 :         if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
      96           0 :             smgrprefetch(smgr, forkNum, blockNum, 1))
      97             :         {
      98           0 :             result.initiated_io = true;
      99             :         }
     100             : #endif                          /* USE_PREFETCH */
     101             :     }
     102             : 
     103        6224 :     return result;
     104             : }
     105             : 
     106             : 
     107             : /*
     108             :  * LocalBufferAlloc -
     109             :  *    Find or create a local buffer for the given page of the given relation.
     110             :  *
     111             :  * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
     112             :  * any locking since this is all local.  We support only default access
     113             :  * strategy (hence, usage_count is always advanced).
     114             :  */
     115             : BufferDesc *
     116     2124208 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
     117             :                  bool *foundPtr)
     118             : {
     119             :     BufferTag   newTag;         /* identity of requested block */
     120             :     LocalBufferLookupEnt *hresult;
     121             :     BufferDesc *bufHdr;
     122             :     Buffer      victim_buffer;
     123             :     int         bufid;
     124             :     bool        found;
     125             : 
     126     2124208 :     InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
     127             : 
     128             :     /* Initialize local buffers if first request in this session */
     129     2124208 :     if (LocalBufHash == NULL)
     130          26 :         InitLocalBuffers();
     131             : 
     132     2124208 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     133             : 
     134             :     /* See if the desired buffer already exists */
     135             :     hresult = (LocalBufferLookupEnt *)
     136     2124208 :         hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
     137             : 
     138     2124208 :     if (hresult)
     139             :     {
     140     2116552 :         bufid = hresult->id;
     141     2116552 :         bufHdr = GetLocalBufferDescriptor(bufid);
     142             :         Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
     143             : 
     144     2116552 :         *foundPtr = PinLocalBuffer(bufHdr, true);
     145             :     }
     146             :     else
     147             :     {
     148             :         uint32      buf_state;
     149             : 
     150        7656 :         victim_buffer = GetLocalVictimBuffer();
     151        7656 :         bufid = -victim_buffer - 1;
     152        7656 :         bufHdr = GetLocalBufferDescriptor(bufid);
     153             : 
     154             :         hresult = (LocalBufferLookupEnt *)
     155        7656 :             hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
     156        7656 :         if (found)              /* shouldn't happen */
     157           0 :             elog(ERROR, "local buffer hash table corrupted");
     158        7656 :         hresult->id = bufid;
     159             : 
     160             :         /*
     161             :          * it's all ours now.
     162             :          */
     163        7656 :         bufHdr->tag = newTag;
     164             : 
     165        7656 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     166        7656 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     167        7656 :         buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     168        7656 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     169             : 
     170        7656 :         *foundPtr = false;
     171             :     }
     172             : 
     173     2124208 :     return bufHdr;
     174             : }
     175             : 
     176             : static Buffer
     177       31812 : GetLocalVictimBuffer(void)
     178             : {
     179             :     int         victim_bufid;
     180             :     int         trycounter;
     181             :     uint32      buf_state;
     182             :     BufferDesc *bufHdr;
     183             : 
     184       31812 :     ResourceOwnerEnlarge(CurrentResourceOwner);
     185             : 
     186             :     /*
     187             :      * Need to get a new buffer.  We use a clock sweep algorithm (essentially
     188             :      * the same as what freelist.c does now...)
     189             :      */
     190       31812 :     trycounter = NLocBuffer;
     191             :     for (;;)
     192             :     {
     193       38226 :         victim_bufid = nextFreeLocalBufId;
     194             : 
     195       38226 :         if (++nextFreeLocalBufId >= NLocBuffer)
     196          66 :             nextFreeLocalBufId = 0;
     197             : 
     198       38226 :         bufHdr = GetLocalBufferDescriptor(victim_bufid);
     199             : 
     200       38226 :         if (LocalRefCount[victim_bufid] == 0)
     201             :         {
     202       38112 :             buf_state = pg_atomic_read_u32(&bufHdr->state);
     203             : 
     204       38112 :             if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
     205             :             {
     206        6300 :                 buf_state -= BUF_USAGECOUNT_ONE;
     207        6300 :                 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     208        6300 :                 trycounter = NLocBuffer;
     209             :             }
     210             :             else
     211             :             {
     212             :                 /* Found a usable buffer */
     213       31812 :                 PinLocalBuffer(bufHdr, false);
     214       31812 :                 break;
     215             :             }
     216             :         }
     217         114 :         else if (--trycounter == 0)
     218           0 :             ereport(ERROR,
     219             :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     220             :                      errmsg("no empty local buffer available")));
     221             :     }
     222             : 
     223             :     /*
     224             :      * lazy memory allocation: allocate space on first use of a buffer.
     225             :      */
     226       31812 :     if (LocalBufHdrGetBlock(bufHdr) == NULL)
     227             :     {
     228             :         /* Set pointer for use by BufferGetBlock() macro */
     229       29322 :         LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
     230             :     }
     231             : 
     232             :     /*
     233             :      * this buffer is not referenced but it might still be dirty. if that's
     234             :      * the case, write it out before reusing it!
     235             :      */
     236       31812 :     if (buf_state & BM_DIRTY)
     237             :     {
     238             :         instr_time  io_start;
     239             :         SMgrRelation oreln;
     240         894 :         Page        localpage = (char *) LocalBufHdrGetBlock(bufHdr);
     241             : 
     242             :         /* Find smgr relation for buffer */
     243         894 :         oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyProcNumber);
     244             : 
     245         894 :         PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
     246             : 
     247         894 :         io_start = pgstat_prepare_io_time(track_io_timing);
     248             : 
     249             :         /* And write... */
     250         894 :         smgrwrite(oreln,
     251         894 :                   BufTagGetForkNum(&bufHdr->tag),
     252             :                   bufHdr->tag.blockNum,
     253             :                   localpage,
     254             :                   false);
     255             : 
     256             :         /* Temporary table I/O does not use Buffer Access Strategies */
     257         894 :         pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
     258             :                                 IOOP_WRITE, io_start, 1);
     259             : 
     260             :         /* Mark not-dirty now in case we error out below */
     261         894 :         buf_state &= ~BM_DIRTY;
     262         894 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     263             : 
     264         894 :         pgBufferUsage.local_blks_written++;
     265             :     }
     266             : 
     267             :     /*
     268             :      * Remove the victim buffer from the hashtable and mark as invalid.
     269             :      */
     270       31812 :     if (buf_state & BM_TAG_VALID)
     271             :     {
     272             :         LocalBufferLookupEnt *hresult;
     273             : 
     274             :         hresult = (LocalBufferLookupEnt *)
     275         900 :             hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     276         900 :         if (!hresult)           /* shouldn't happen */
     277           0 :             elog(ERROR, "local buffer hash table corrupted");
     278             :         /* mark buffer invalid just in case hash insert fails */
     279         900 :         ClearBufferTag(&bufHdr->tag);
     280         900 :         buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
     281         900 :         pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     282         900 :         pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT);
     283             :     }
     284             : 
     285       31812 :     return BufferDescriptorGetBuffer(bufHdr);
     286             : }
     287             : 
     288             : /* see LimitAdditionalPins() */
     289             : void
     290       28510 : LimitAdditionalLocalPins(uint32 *additional_pins)
     291             : {
     292             :     uint32      max_pins;
     293             : 
     294       28510 :     if (*additional_pins <= 1)
     295       17054 :         return;
     296             : 
     297             :     /*
     298             :      * In contrast to LimitAdditionalPins() other backends don't play a role
     299             :      * here. We can allow up to NLocBuffer pins in total, but it might not be
     300             :      * initialized yet so read num_temp_buffers.
     301             :      */
     302       11456 :     max_pins = (num_temp_buffers - NLocalPinnedBuffers);
     303             : 
     304       11456 :     if (*additional_pins >= max_pins)
     305           0 :         *additional_pins = max_pins;
     306             : }
     307             : 
     308             : /*
     309             :  * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
     310             :  * temporary buffers.
     311             :  */
     312             : BlockNumber
     313       17664 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
     314             :                        ForkNumber fork,
     315             :                        uint32 flags,
     316             :                        uint32 extend_by,
     317             :                        BlockNumber extend_upto,
     318             :                        Buffer *buffers,
     319             :                        uint32 *extended_by)
     320             : {
     321             :     BlockNumber first_block;
     322             :     instr_time  io_start;
     323             : 
     324             :     /* Initialize local buffers if first request in this session */
     325       17664 :     if (LocalBufHash == NULL)
     326         476 :         InitLocalBuffers();
     327             : 
     328       17664 :     LimitAdditionalLocalPins(&extend_by);
     329             : 
     330       41820 :     for (uint32 i = 0; i < extend_by; i++)
     331             :     {
     332             :         BufferDesc *buf_hdr;
     333             :         Block       buf_block;
     334             : 
     335       24156 :         buffers[i] = GetLocalVictimBuffer();
     336       24156 :         buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
     337       24156 :         buf_block = LocalBufHdrGetBlock(buf_hdr);
     338             : 
     339             :         /* new buffers are zero-filled */
     340       24156 :         MemSet((char *) buf_block, 0, BLCKSZ);
     341             :     }
     342             : 
     343       17664 :     first_block = smgrnblocks(bmr.smgr, fork);
     344             : 
     345             :     if (extend_upto != InvalidBlockNumber)
     346             :     {
     347             :         /*
     348             :          * In contrast to shared relations, nothing could change the relation
     349             :          * size concurrently. Thus we shouldn't end up finding that we don't
     350             :          * need to do anything.
     351             :          */
     352             :         Assert(first_block <= extend_upto);
     353             : 
     354             :         Assert((uint64) first_block + extend_by <= extend_upto);
     355             :     }
     356             : 
     357             :     /* Fail if relation is already at maximum possible length */
     358       17664 :     if ((uint64) first_block + extend_by >= MaxBlockNumber)
     359           0 :         ereport(ERROR,
     360             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     361             :                  errmsg("cannot extend relation %s beyond %u blocks",
     362             :                         relpath(bmr.smgr->smgr_rlocator, fork),
     363             :                         MaxBlockNumber)));
     364             : 
     365       41820 :     for (uint32 i = 0; i < extend_by; i++)
     366             :     {
     367             :         int         victim_buf_id;
     368             :         BufferDesc *victim_buf_hdr;
     369             :         BufferTag   tag;
     370             :         LocalBufferLookupEnt *hresult;
     371             :         bool        found;
     372             : 
     373       24156 :         victim_buf_id = -buffers[i] - 1;
     374       24156 :         victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
     375             : 
     376             :         /* in case we need to pin an existing buffer below */
     377       24156 :         ResourceOwnerEnlarge(CurrentResourceOwner);
     378             : 
     379       24156 :         InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
     380             : 
     381             :         hresult = (LocalBufferLookupEnt *)
     382       24156 :             hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
     383       24156 :         if (found)
     384             :         {
     385             :             BufferDesc *existing_hdr;
     386             :             uint32      buf_state;
     387             : 
     388           0 :             UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
     389             : 
     390           0 :             existing_hdr = GetLocalBufferDescriptor(hresult->id);
     391           0 :             PinLocalBuffer(existing_hdr, false);
     392           0 :             buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
     393             : 
     394           0 :             buf_state = pg_atomic_read_u32(&existing_hdr->state);
     395             :             Assert(buf_state & BM_TAG_VALID);
     396             :             Assert(!(buf_state & BM_DIRTY));
     397           0 :             buf_state &= ~BM_VALID;
     398           0 :             pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
     399             :         }
     400             :         else
     401             :         {
     402       24156 :             uint32      buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
     403             : 
     404             :             Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
     405             : 
     406       24156 :             victim_buf_hdr->tag = tag;
     407             : 
     408       24156 :             buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
     409             : 
     410       24156 :             pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
     411             : 
     412       24156 :             hresult->id = victim_buf_id;
     413             :         }
     414             :     }
     415             : 
     416       17664 :     io_start = pgstat_prepare_io_time(track_io_timing);
     417             : 
     418             :     /* actually extend relation */
     419       17664 :     smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
     420             : 
     421       17664 :     pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
     422             :                             io_start, extend_by);
     423             : 
     424       41820 :     for (uint32 i = 0; i < extend_by; i++)
     425             :     {
     426       24156 :         Buffer      buf = buffers[i];
     427             :         BufferDesc *buf_hdr;
     428             :         uint32      buf_state;
     429             : 
     430       24156 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     431             : 
     432       24156 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     433       24156 :         buf_state |= BM_VALID;
     434       24156 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     435             :     }
     436             : 
     437       17664 :     *extended_by = extend_by;
     438             : 
     439       17664 :     pgBufferUsage.local_blks_written += extend_by;
     440             : 
     441       17664 :     return first_block;
     442             : }
     443             : 
     444             : /*
     445             :  * MarkLocalBufferDirty -
     446             :  *    mark a local buffer dirty
     447             :  */
     448             : void
     449     3254276 : MarkLocalBufferDirty(Buffer buffer)
     450             : {
     451             :     int         bufid;
     452             :     BufferDesc *bufHdr;
     453             :     uint32      buf_state;
     454             : 
     455             :     Assert(BufferIsLocal(buffer));
     456             : 
     457             : #ifdef LBDEBUG
     458             :     fprintf(stderr, "LB DIRTY %d\n", buffer);
     459             : #endif
     460             : 
     461     3254276 :     bufid = -buffer - 1;
     462             : 
     463             :     Assert(LocalRefCount[bufid] > 0);
     464             : 
     465     3254276 :     bufHdr = GetLocalBufferDescriptor(bufid);
     466             : 
     467     3254276 :     buf_state = pg_atomic_read_u32(&bufHdr->state);
     468             : 
     469     3254276 :     if (!(buf_state & BM_DIRTY))
     470       22376 :         pgBufferUsage.local_blks_dirtied++;
     471             : 
     472     3254276 :     buf_state |= BM_DIRTY;
     473             : 
     474     3254276 :     pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     475     3254276 : }
     476             : 
     477             : /*
     478             :  * DropRelationLocalBuffers
     479             :  *      This function removes from the buffer pool all the pages of the
     480             :  *      specified relation that have block numbers >= firstDelBlock.
     481             :  *      (In particular, with firstDelBlock = 0, all pages are removed.)
     482             :  *      Dirty pages are simply dropped, without bothering to write them
     483             :  *      out first.  Therefore, this is NOT rollback-able, and so should be
     484             :  *      used only with extreme caution!
     485             :  *
     486             :  *      See DropRelationBuffers in bufmgr.c for more notes.
     487             :  */
     488             : void
     489         692 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
     490             :                          BlockNumber firstDelBlock)
     491             : {
     492             :     int         i;
     493             : 
     494      645812 :     for (i = 0; i < NLocBuffer; i++)
     495             :     {
     496      645120 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     497             :         LocalBufferLookupEnt *hresult;
     498             :         uint32      buf_state;
     499             : 
     500      645120 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     501             : 
     502      705420 :         if ((buf_state & BM_TAG_VALID) &&
     503       62070 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
     504        1770 :             BufTagGetForkNum(&bufHdr->tag) == forkNum &&
     505        1636 :             bufHdr->tag.blockNum >= firstDelBlock)
     506             :         {
     507        1584 :             if (LocalRefCount[i] != 0)
     508           0 :                 elog(ERROR, "block %u of %s is still referenced (local %u)",
     509             :                      bufHdr->tag.blockNum,
     510             :                      relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     511             :                                     MyProcNumber,
     512             :                                     BufTagGetForkNum(&bufHdr->tag)),
     513             :                      LocalRefCount[i]);
     514             : 
     515             :             /* Remove entry from hashtable */
     516             :             hresult = (LocalBufferLookupEnt *)
     517        1584 :                 hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     518        1584 :             if (!hresult)       /* shouldn't happen */
     519           0 :                 elog(ERROR, "local buffer hash table corrupted");
     520             :             /* Mark buffer invalid */
     521        1584 :             ClearBufferTag(&bufHdr->tag);
     522        1584 :             buf_state &= ~BUF_FLAG_MASK;
     523        1584 :             buf_state &= ~BUF_USAGECOUNT_MASK;
     524        1584 :             pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     525             :         }
     526             :     }
     527         692 : }
     528             : 
     529             : /*
     530             :  * DropRelationAllLocalBuffers
     531             :  *      This function removes from the buffer pool all pages of all forks
     532             :  *      of the specified relation.
     533             :  *
     534             :  *      See DropRelationsAllBuffers in bufmgr.c for more notes.
     535             :  */
     536             : void
     537        5998 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
     538             : {
     539             :     int         i;
     540             : 
     541     5815678 :     for (i = 0; i < NLocBuffer; i++)
     542             :     {
     543     5809680 :         BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
     544             :         LocalBufferLookupEnt *hresult;
     545             :         uint32      buf_state;
     546             : 
     547     5809680 :         buf_state = pg_atomic_read_u32(&bufHdr->state);
     548             : 
     549     6223644 :         if ((buf_state & BM_TAG_VALID) &&
     550      413964 :             BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
     551             :         {
     552       29328 :             if (LocalRefCount[i] != 0)
     553           0 :                 elog(ERROR, "block %u of %s is still referenced (local %u)",
     554             :                      bufHdr->tag.blockNum,
     555             :                      relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
     556             :                                     MyProcNumber,
     557             :                                     BufTagGetForkNum(&bufHdr->tag)),
     558             :                      LocalRefCount[i]);
     559             :             /* Remove entry from hashtable */
     560             :             hresult = (LocalBufferLookupEnt *)
     561       29328 :                 hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
     562       29328 :             if (!hresult)       /* shouldn't happen */
     563           0 :                 elog(ERROR, "local buffer hash table corrupted");
     564             :             /* Mark buffer invalid */
     565       29328 :             ClearBufferTag(&bufHdr->tag);
     566       29328 :             buf_state &= ~BUF_FLAG_MASK;
     567       29328 :             buf_state &= ~BUF_USAGECOUNT_MASK;
     568       29328 :             pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
     569             :         }
     570             :     }
     571        5998 : }
     572             : 
     573             : /*
     574             :  * InitLocalBuffers -
     575             :  *    init the local buffer cache. Since most queries (esp. multi-user ones)
     576             :  *    don't involve local buffers, we delay allocating actual memory for the
     577             :  *    buffers until we need them; just make the buffer headers here.
     578             :  */
     579             : static void
     580         502 : InitLocalBuffers(void)
     581             : {
     582         502 :     int         nbufs = num_temp_buffers;
     583             :     HASHCTL     info;
     584             :     int         i;
     585             : 
     586             :     /*
     587             :      * Parallel workers can't access data in temporary tables, because they
     588             :      * have no visibility into the local buffers of their leader.  This is a
     589             :      * convenient, low-cost place to provide a backstop check for that.  Note
     590             :      * that we don't wish to prevent a parallel worker from accessing catalog
     591             :      * metadata about a temp table, so checks at higher levels would be
     592             :      * inappropriate.
     593             :      */
     594         502 :     if (IsParallelWorker())
     595           0 :         ereport(ERROR,
     596             :                 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     597             :                  errmsg("cannot access temporary tables during a parallel operation")));
     598             : 
     599             :     /* Allocate and zero buffer headers and auxiliary arrays */
     600         502 :     LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
     601         502 :     LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
     602         502 :     LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
     603         502 :     if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
     604           0 :         ereport(FATAL,
     605             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     606             :                  errmsg("out of memory")));
     607             : 
     608         502 :     nextFreeLocalBufId = 0;
     609             : 
     610             :     /* initialize fields that need to start off nonzero */
     611      509006 :     for (i = 0; i < nbufs; i++)
     612             :     {
     613      508504 :         BufferDesc *buf = GetLocalBufferDescriptor(i);
     614             : 
     615             :         /*
     616             :          * negative to indicate local buffer. This is tricky: shared buffers
     617             :          * start with 0. We have to start with -2. (Note that the routine
     618             :          * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
     619             :          * is -1.)
     620             :          */
     621      508504 :         buf->buf_id = -i - 2;
     622             : 
     623             :         /*
     624             :          * Intentionally do not initialize the buffer's atomic variable
     625             :          * (besides zeroing the underlying memory above). That way we get
     626             :          * errors on platforms without atomics, if somebody (re-)introduces
     627             :          * atomic operations for local buffers.
     628             :          */
     629             :     }
     630             : 
     631             :     /* Create the lookup hash table */
     632         502 :     info.keysize = sizeof(BufferTag);
     633         502 :     info.entrysize = sizeof(LocalBufferLookupEnt);
     634             : 
     635         502 :     LocalBufHash = hash_create("Local Buffer Lookup Table",
     636             :                                nbufs,
     637             :                                &info,
     638             :                                HASH_ELEM | HASH_BLOBS);
     639             : 
     640         502 :     if (!LocalBufHash)
     641           0 :         elog(ERROR, "could not initialize local buffer hash table");
     642             : 
     643             :     /* Initialization done, mark buffers allocated */
     644         502 :     NLocBuffer = nbufs;
     645         502 : }
     646             : 
     647             : /*
     648             :  * XXX: We could have a slightly more efficient version of PinLocalBuffer()
     649             :  * that does not support adjusting the usagecount - but so far it does not
     650             :  * seem worth the trouble.
     651             :  *
     652             :  * Note that ResourceOwnerEnlarge() must have been done already.
     653             :  */
     654             : bool
     655     2148364 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
     656             : {
     657             :     uint32      buf_state;
     658     2148364 :     Buffer      buffer = BufferDescriptorGetBuffer(buf_hdr);
     659     2148364 :     int         bufid = -buffer - 1;
     660             : 
     661     2148364 :     buf_state = pg_atomic_read_u32(&buf_hdr->state);
     662             : 
     663     2148364 :     if (LocalRefCount[bufid] == 0)
     664             :     {
     665     1978454 :         NLocalPinnedBuffers++;
     666     1978454 :         if (adjust_usagecount &&
     667     1946642 :             BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
     668             :         {
     669      106218 :             buf_state += BUF_USAGECOUNT_ONE;
     670      106218 :             pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     671             :         }
     672             :     }
     673     2148364 :     LocalRefCount[bufid]++;
     674     2148364 :     ResourceOwnerRememberBuffer(CurrentResourceOwner,
     675             :                                 BufferDescriptorGetBuffer(buf_hdr));
     676             : 
     677     2148364 :     return buf_state & BM_VALID;
     678             : }
     679             : 
     680             : void
     681     2845300 : UnpinLocalBuffer(Buffer buffer)
     682             : {
     683     2845300 :     UnpinLocalBufferNoOwner(buffer);
     684     2845300 :     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
     685     2845300 : }
     686             : 
     687             : void
     688     2846054 : UnpinLocalBufferNoOwner(Buffer buffer)
     689             : {
     690     2846054 :     int         buffid = -buffer - 1;
     691             : 
     692             :     Assert(BufferIsLocal(buffer));
     693             :     Assert(LocalRefCount[buffid] > 0);
     694             :     Assert(NLocalPinnedBuffers > 0);
     695             : 
     696     2846054 :     if (--LocalRefCount[buffid] == 0)
     697     1978454 :         NLocalPinnedBuffers--;
     698     2846054 : }
     699             : 
     700             : /*
     701             :  * GUC check_hook for temp_buffers
     702             :  */
     703             : bool
     704        1974 : check_temp_buffers(int *newval, void **extra, GucSource source)
     705             : {
     706             :     /*
     707             :      * Once local buffers have been initialized, it's too late to change this.
     708             :      * However, if this is only a test call, allow it.
     709             :      */
     710        1974 :     if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
     711             :     {
     712           0 :         GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
     713           0 :         return false;
     714             :     }
     715        1974 :     return true;
     716             : }
     717             : 
     718             : /*
     719             :  * GetLocalBufferStorage - allocate memory for a local buffer
     720             :  *
     721             :  * The idea of this function is to aggregate our requests for storage
     722             :  * so that the memory manager doesn't see a whole lot of relatively small
     723             :  * requests.  Since we'll never give back a local buffer once it's created
     724             :  * within a particular process, no point in burdening memmgr with separately
     725             :  * managed chunks.
     726             :  */
     727             : static Block
     728       29322 : GetLocalBufferStorage(void)
     729             : {
     730             :     static char *cur_block = NULL;
     731             :     static int  next_buf_in_block = 0;
     732             :     static int  num_bufs_in_block = 0;
     733             :     static int  total_bufs_allocated = 0;
     734             :     static MemoryContext LocalBufferContext = NULL;
     735             : 
     736             :     char       *this_buf;
     737             : 
     738             :     Assert(total_bufs_allocated < NLocBuffer);
     739             : 
     740       29322 :     if (next_buf_in_block >= num_bufs_in_block)
     741             :     {
     742             :         /* Need to make a new request to memmgr */
     743             :         int         num_bufs;
     744             : 
     745             :         /*
     746             :          * We allocate local buffers in a context of their own, so that the
     747             :          * space eaten for them is easily recognizable in MemoryContextStats
     748             :          * output.  Create the context on first use.
     749             :          */
     750         778 :         if (LocalBufferContext == NULL)
     751         502 :             LocalBufferContext =
     752         502 :                 AllocSetContextCreate(TopMemoryContext,
     753             :                                       "LocalBufferContext",
     754             :                                       ALLOCSET_DEFAULT_SIZES);
     755             : 
     756             :         /* Start with a 16-buffer request; subsequent ones double each time */
     757         778 :         num_bufs = Max(num_bufs_in_block * 2, 16);
     758             :         /* But not more than what we need for all remaining local bufs */
     759         778 :         num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
     760             :         /* And don't overflow MaxAllocSize, either */
     761         778 :         num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
     762             : 
     763             :         /* Buffers should be I/O aligned. */
     764         778 :         cur_block = (char *)
     765         778 :             TYPEALIGN(PG_IO_ALIGN_SIZE,
     766             :                       MemoryContextAlloc(LocalBufferContext,
     767             :                                          num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
     768         778 :         next_buf_in_block = 0;
     769         778 :         num_bufs_in_block = num_bufs;
     770             :     }
     771             : 
     772             :     /* Allocate next buffer in current memory block */
     773       29322 :     this_buf = cur_block + next_buf_in_block * BLCKSZ;
     774       29322 :     next_buf_in_block++;
     775       29322 :     total_bufs_allocated++;
     776             : 
     777       29322 :     return (Block) this_buf;
     778             : }
     779             : 
     780             : /*
     781             :  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
     782             :  *
     783             :  * This is just like CheckForBufferLeaks(), but for local buffers.
     784             :  */
     785             : static void
     786      784040 : CheckForLocalBufferLeaks(void)
     787             : {
     788             : #ifdef USE_ASSERT_CHECKING
     789             :     if (LocalRefCount)
     790             :     {
     791             :         int         RefCountErrors = 0;
     792             :         int         i;
     793             : 
     794             :         for (i = 0; i < NLocBuffer; i++)
     795             :         {
     796             :             if (LocalRefCount[i] != 0)
     797             :             {
     798             :                 Buffer      b = -i - 1;
     799             :                 char       *s;
     800             : 
     801             :                 s = DebugPrintBufferRefcount(b);
     802             :                 elog(WARNING, "local buffer refcount leak: %s", s);
     803             :                 pfree(s);
     804             : 
     805             :                 RefCountErrors++;
     806             :             }
     807             :         }
     808             :         Assert(RefCountErrors == 0);
     809             :     }
     810             : #endif
     811      784040 : }
     812             : 
     813             : /*
     814             :  * AtEOXact_LocalBuffers - clean up at end of transaction.
     815             :  *
     816             :  * This is just like AtEOXact_Buffers, but for local buffers.
     817             :  */
     818             : void
     819      748482 : AtEOXact_LocalBuffers(bool isCommit)
     820             : {
     821      748482 :     CheckForLocalBufferLeaks();
     822      748482 : }
     823             : 
     824             : /*
     825             :  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
     826             :  *
     827             :  * This is just like AtProcExit_Buffers, but for local buffers.
     828             :  */
     829             : void
     830       35558 : AtProcExit_LocalBuffers(void)
     831             : {
     832             :     /*
     833             :      * We shouldn't be holding any remaining pins; if we are, and assertions
     834             :      * aren't enabled, we'll fail later in DropRelationBuffers while trying to
     835             :      * drop the temp rels.
     836             :      */
     837       35558 :     CheckForLocalBufferLeaks();
     838       35558 : }

Generated by: LCOV version 1.14