LCOV - code coverage report
Current view: top level - src/backend/access/heap - hio.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 134 160 83.8 %
Date: 2019-11-21 14:06:36 Functions: 5 5 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * hio.c
       4             :  *    POSTGRES heap access method input/output code.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/heap/hio.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/heapam.h"
      19             : #include "access/hio.h"
      20             : #include "access/htup_details.h"
      21             : #include "access/visibilitymap.h"
      22             : #include "storage/bufmgr.h"
      23             : #include "storage/freespace.h"
      24             : #include "storage/lmgr.h"
      25             : #include "storage/smgr.h"
      26             : 
      27             : 
      28             : /*
      29             :  * RelationPutHeapTuple - place tuple at specified page
      30             :  *
      31             :  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!  Must PANIC on failure!!!
      32             :  *
      33             :  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
      34             :  */
      35             : void
      36    20016346 : RelationPutHeapTuple(Relation relation,
      37             :                      Buffer buffer,
      38             :                      HeapTuple tuple,
      39             :                      bool token)
      40             : {
      41             :     Page        pageHeader;
      42             :     OffsetNumber offnum;
      43             : 
      44             :     /*
      45             :      * A tuple that's being inserted speculatively should already have its
      46             :      * token set.
      47             :      */
      48             :     Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
      49             : 
      50             :     /* Add the tuple to the page */
      51    20016346 :     pageHeader = BufferGetPage(buffer);
      52             : 
      53    20016346 :     offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
      54             :                          tuple->t_len, InvalidOffsetNumber, false, true);
      55             : 
      56    20016346 :     if (offnum == InvalidOffsetNumber)
      57           0 :         elog(PANIC, "failed to add tuple to page");
      58             : 
      59             :     /* Update tuple->t_self to the actual position where it was stored */
      60    20016346 :     ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
      61             : 
      62             :     /*
      63             :      * Insert the correct position into CTID of the stored tuple, too (unless
      64             :      * this is a speculative insertion, in which case the token is held in
      65             :      * CTID field instead)
      66             :      */
      67    20016346 :     if (!token)
      68             :     {
      69    20012454 :         ItemId      itemId = PageGetItemId(pageHeader, offnum);
      70    20012454 :         HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);
      71             : 
      72    20012454 :         item->t_ctid = tuple->t_self;
      73             :     }
      74    20016346 : }
      75             : 
      76             : /*
      77             :  * Read in a buffer in mode, using bulk-insert strategy if bistate isn't NULL.
      78             :  */
      79             : static Buffer
      80    17822234 : ReadBufferBI(Relation relation, BlockNumber targetBlock,
      81             :              ReadBufferMode mode, BulkInsertState bistate)
      82             : {
      83             :     Buffer      buffer;
      84             : 
      85             :     /* If not bulk-insert, exactly like ReadBuffer */
      86    17822234 :     if (!bistate)
      87    16499506 :         return ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
      88             :                                   mode, NULL);
      89             : 
      90             :     /* If we have the desired block already pinned, re-pin and return it */
      91     1322728 :     if (bistate->current_buf != InvalidBuffer)
      92             :     {
      93     1320272 :         if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
      94             :         {
      95             :             /*
      96             :              * Currently the LOCK variants are only used for extending
      97             :              * relation, which should never reach this branch.
      98             :              */
      99             :             Assert(mode != RBM_ZERO_AND_LOCK &&
     100             :                    mode != RBM_ZERO_AND_CLEANUP_LOCK);
     101             : 
     102     1279856 :             IncrBufferRefCount(bistate->current_buf);
     103     1279856 :             return bistate->current_buf;
     104             :         }
     105             :         /* ... else drop the old buffer */
     106       40416 :         ReleaseBuffer(bistate->current_buf);
     107       40416 :         bistate->current_buf = InvalidBuffer;
     108             :     }
     109             : 
     110             :     /* Perform a read using the buffer strategy */
     111       42872 :     buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
     112             :                                 mode, bistate->strategy);
     113             : 
     114             :     /* Save the selected block as target for future inserts */
     115       42872 :     IncrBufferRefCount(buffer);
     116       42872 :     bistate->current_buf = buffer;
     117             : 
     118       42872 :     return buffer;
     119             : }
     120             : 
     121             : /*
     122             :  * For each heap page which is all-visible, acquire a pin on the appropriate
     123             :  * visibility map page, if we haven't already got one.
     124             :  *
     125             :  * buffer2 may be InvalidBuffer, if only one buffer is involved.  buffer1
     126             :  * must not be InvalidBuffer.  If both buffers are specified, block1 must
     127             :  * be less than block2.
     128             :  */
     129             : static void
     130    17658998 : GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
     131             :                      BlockNumber block1, BlockNumber block2,
     132             :                      Buffer *vmbuffer1, Buffer *vmbuffer2)
     133             : {
     134             :     bool        need_to_pin_buffer1;
     135             :     bool        need_to_pin_buffer2;
     136             : 
     137             :     Assert(BufferIsValid(buffer1));
     138             :     Assert(buffer2 == InvalidBuffer || block1 <= block2);
     139             : 
     140             :     while (1)
     141             :     {
     142             :         /* Figure out which pins we need but don't have. */
     143    35317996 :         need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
     144    17658998 :             && !visibilitymap_pin_ok(block1, *vmbuffer1);
     145    17658998 :         need_to_pin_buffer2 = buffer2 != InvalidBuffer
     146      100420 :             && PageIsAllVisible(BufferGetPage(buffer2))
     147    17659676 :             && !visibilitymap_pin_ok(block2, *vmbuffer2);
     148    17658998 :         if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
     149    17658998 :             return;
     150             : 
     151             :         /* We must unlock both buffers before doing any I/O. */
     152           0 :         LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
     153           0 :         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
     154           0 :             LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
     155             : 
     156             :         /* Get pins. */
     157           0 :         if (need_to_pin_buffer1)
     158           0 :             visibilitymap_pin(relation, block1, vmbuffer1);
     159           0 :         if (need_to_pin_buffer2)
     160           0 :             visibilitymap_pin(relation, block2, vmbuffer2);
     161             : 
     162             :         /* Relock buffers. */
     163           0 :         LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
     164           0 :         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
     165           0 :             LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
     166             : 
     167             :         /*
     168             :          * If there are two buffers involved and we pinned just one of them,
     169             :          * it's possible that the second one became all-visible while we were
     170             :          * busy pinning the first one.  If it looks like that's a possible
     171             :          * scenario, we'll need to make a second pass through this loop.
     172             :          */
     173           0 :         if (buffer2 == InvalidBuffer || buffer1 == buffer2
     174           0 :             || (need_to_pin_buffer1 && need_to_pin_buffer2))
     175             :             break;
     176             :     }
     177             : }
     178             : 
     179             : /*
     180             :  * Extend a relation by multiple blocks to avoid future contention on the
     181             :  * relation extension lock.  Our goal is to pre-extend the relation by an
     182             :  * amount which ramps up as the degree of contention ramps up, but limiting
     183             :  * the result to some sane overall value.
     184             :  */
     185             : static void
     186          64 : RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
     187             : {
     188             :     BlockNumber blockNum,
     189          64 :                 firstBlock = InvalidBlockNumber;
     190             :     int         extraBlocks;
     191             :     int         lockWaiters;
     192             : 
     193             :     /* Use the length of the lock wait queue to judge how much to extend. */
     194          64 :     lockWaiters = RelationExtensionLockWaiterCount(relation);
     195          64 :     if (lockWaiters <= 0)
     196           0 :         return;
     197             : 
     198             :     /*
     199             :      * It might seem like multiplying the number of lock waiters by as much as
     200             :      * 20 is too aggressive, but benchmarking revealed that smaller numbers
     201             :      * were insufficient.  512 is just an arbitrary cap to prevent
     202             :      * pathological results.
     203             :      */
     204          64 :     extraBlocks = Min(512, lockWaiters * 20);
     205             : 
     206             :     do
     207             :     {
     208             :         Buffer      buffer;
     209             :         Page        page;
     210             :         Size        freespace;
     211             : 
     212             :         /*
     213             :          * Extend by one page.  This should generally match the main-line
     214             :          * extension code in RelationGetBufferForTuple, except that we hold
     215             :          * the relation extension lock throughout, and we don't immediately
     216             :          * initialize the page (see below).
     217             :          */
     218        1720 :         buffer = ReadBufferBI(relation, P_NEW, RBM_ZERO_AND_LOCK, bistate);
     219        1720 :         page = BufferGetPage(buffer);
     220             : 
     221        1720 :         if (!PageIsNew(page))
     222           0 :             elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
     223             :                  BufferGetBlockNumber(buffer),
     224             :                  RelationGetRelationName(relation));
     225             : 
     226             :         /*
     227             :          * Add the page to the FSM without initializing. If we were to
     228             :          * initialize here, the page would potentially get flushed out to disk
     229             :          * before we add any useful content. There's no guarantee that that'd
     230             :          * happen before a potential crash, so we need to deal with
     231             :          * uninitialized pages anyway, thus avoid the potential for
     232             :          * unnecessary writes.
     233             :          */
     234             : 
     235             :         /* we'll need this info below */
     236        1720 :         blockNum = BufferGetBlockNumber(buffer);
     237        1720 :         freespace = BufferGetPageSize(buffer) - SizeOfPageHeaderData;
     238             : 
     239        1720 :         UnlockReleaseBuffer(buffer);
     240             : 
     241             :         /* Remember first block number thus added. */
     242        1720 :         if (firstBlock == InvalidBlockNumber)
     243          64 :             firstBlock = blockNum;
     244             : 
     245             :         /*
     246             :          * Immediately update the bottom level of the FSM.  This has a good
     247             :          * chance of making this page visible to other concurrently inserting
     248             :          * backends, and we want that to happen without delay.
     249             :          */
     250        1720 :         RecordPageWithFreeSpace(relation, blockNum, freespace);
     251             :     }
     252        1720 :     while (--extraBlocks > 0);
     253             : 
     254             :     /*
     255             :      * Updating the upper levels of the free space map is too expensive to do
     256             :      * for every block, but it's worth doing once at the end to make sure that
     257             :      * subsequent insertion activity sees all of those nifty free pages we
     258             :      * just inserted.
     259             :      */
     260          64 :     FreeSpaceMapVacuumRange(relation, firstBlock, blockNum + 1);
     261             : }
     262             : 
     263             : /*
     264             :  * RelationGetBufferForTuple
     265             :  *
     266             :  *  Returns pinned and exclusive-locked buffer of a page in given relation
     267             :  *  with free space >= given len.
     268             :  *
     269             :  *  If otherBuffer is not InvalidBuffer, then it references a previously
     270             :  *  pinned buffer of another page in the same relation; on return, this
     271             :  *  buffer will also be exclusive-locked.  (This case is used by heap_update;
     272             :  *  the otherBuffer contains the tuple being updated.)
     273             :  *
     274             :  *  The reason for passing otherBuffer is that if two backends are doing
     275             :  *  concurrent heap_update operations, a deadlock could occur if they try
     276             :  *  to lock the same two buffers in opposite orders.  To ensure that this
     277             :  *  can't happen, we impose the rule that buffers of a relation must be
     278             :  *  locked in increasing page number order.  This is most conveniently done
     279             :  *  by having RelationGetBufferForTuple lock them both, with suitable care
     280             :  *  for ordering.
     281             :  *
     282             :  *  NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
     283             :  *  same buffer we select for insertion of the new tuple (this could only
     284             :  *  happen if space is freed in that page after heap_update finds there's not
     285             :  *  enough there).  In that case, the page will be pinned and locked only once.
     286             :  *
     287             :  *  For the vmbuffer and vmbuffer_other arguments, we avoid deadlock by
     288             :  *  locking them only after locking the corresponding heap page, and taking
     289             :  *  no further lwlocks while they are locked.
     290             :  *
     291             :  *  We normally use FSM to help us find free space.  However,
     292             :  *  if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
     293             :  *  the end of the relation if the tuple won't fit on the current target page.
     294             :  *  This can save some cycles when we know the relation is new and doesn't
     295             :  *  contain useful amounts of free space.
     296             :  *
     297             :  *  HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
     298             :  *  relation, if the caller holds exclusive lock and is careful to invalidate
     299             :  *  relation's smgr_targblock before the first insertion --- that ensures that
     300             :  *  all insertions will occur into newly added pages and not be intermixed
     301             :  *  with tuples from other transactions.  That way, a crash can't risk losing
     302             :  *  any committed data of other transactions.  (See heap_insert's comments
     303             :  *  for additional constraints needed for safe usage of this behavior.)
     304             :  *
     305             :  *  The caller can also provide a BulkInsertState object to optimize many
     306             :  *  insertions into the same relation.  This keeps a pin on the current
     307             :  *  insertion target page (to save pin/unpin cycles) and also passes a
     308             :  *  BULKWRITE buffer selection strategy object to the buffer manager.
     309             :  *  Passing NULL for bistate selects the default behavior.
     310             :  *
     311             :  *  We always try to avoid filling existing pages further than the fillfactor.
     312             :  *  This is OK since this routine is not consulted when updating a tuple and
     313             :  *  keeping it on the same page, which is the scenario fillfactor is meant
     314             :  *  to reserve space for.
     315             :  *
     316             :  *  ereport(ERROR) is allowed here, so this routine *must* be called
     317             :  *  before any (unlogged) changes are made in buffer pool.
     318             :  */
     319             : Buffer
     320    17671294 : RelationGetBufferForTuple(Relation relation, Size len,
     321             :                           Buffer otherBuffer, int options,
     322             :                           BulkInsertState bistate,
     323             :                           Buffer *vmbuffer, Buffer *vmbuffer_other)
     324             : {
     325    17671294 :     bool        use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
     326    17671294 :     Buffer      buffer = InvalidBuffer;
     327             :     Page        page;
     328    17671294 :     Size        pageFreeSpace = 0,
     329    17671294 :                 saveFreeSpace = 0;
     330             :     BlockNumber targetBlock,
     331             :                 otherBlock;
     332             :     bool        needLock;
     333             : 
     334    17671294 :     len = MAXALIGN(len);        /* be conservative */
     335             : 
     336             :     /* Bulk insert is not supported for updates, only inserts. */
     337             :     Assert(otherBuffer == InvalidBuffer || !bistate);
     338             : 
     339             :     /*
     340             :      * If we're gonna fail for oversize tuple, do it right away
     341             :      */
     342    17671294 :     if (len > MaxHeapTupleSize)
     343           0 :         ereport(ERROR,
     344             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     345             :                  errmsg("row is too big: size %zu, maximum size %zu",
     346             :                         len, MaxHeapTupleSize)));
     347             : 
     348             :     /* Compute desired extra freespace due to fillfactor option */
     349    17671294 :     saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
     350             :                                                    HEAP_DEFAULT_FILLFACTOR);
     351             : 
     352    17671294 :     if (otherBuffer != InvalidBuffer)
     353       99126 :         otherBlock = BufferGetBlockNumber(otherBuffer);
     354             :     else
     355    17572168 :         otherBlock = InvalidBlockNumber;    /* just to keep compiler quiet */
     356             : 
     357             :     /*
     358             :      * We first try to put the tuple on the same page we last inserted a tuple
     359             :      * on, as cached in the BulkInsertState or relcache entry.  If that
     360             :      * doesn't work, we ask the Free Space Map to locate a suitable page.
     361             :      * Since the FSM's info might be out of date, we have to be prepared to
     362             :      * loop around and retry multiple times. (To insure this isn't an infinite
     363             :      * loop, we must update the FSM with the correct amount of free space on
     364             :      * each page that proves not to be suitable.)  If the FSM has no record of
     365             :      * a page with enough free space, we give up and extend the relation.
     366             :      *
     367             :      * When use_fsm is false, we either put the tuple onto the existing target
     368             :      * page or extend the relation.
     369             :      */
     370    17671294 :     if (len + saveFreeSpace > MaxHeapTupleSize)
     371             :     {
     372             :         /* can't fit, don't bother asking FSM */
     373          20 :         targetBlock = InvalidBlockNumber;
     374          20 :         use_fsm = false;
     375             :     }
     376    17671274 :     else if (bistate && bistate->current_buf != InvalidBuffer)
     377     1279856 :         targetBlock = BufferGetBlockNumber(bistate->current_buf);
     378             :     else
     379    16391418 :         targetBlock = RelationGetTargetBlock(relation);
     380             : 
     381    17671294 :     if (targetBlock == InvalidBlockNumber && use_fsm)
     382             :     {
     383             :         /*
     384             :          * We have no cached target page, so ask the FSM for an initial
     385             :          * target.
     386             :          */
     387      119094 :         targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
     388             : 
     389             :         /*
     390             :          * If the FSM knows nothing of the rel, try the last page before we
     391             :          * give up and extend.  This avoids one-tuple-per-page syndrome during
     392             :          * bootstrapping or in a recently-started system.
     393             :          */
     394      119094 :         if (targetBlock == InvalidBlockNumber)
     395             :         {
     396      108562 :             BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
     397             : 
     398      108562 :             if (nblocks > 0)
     399       81902 :                 targetBlock = nblocks - 1;
     400             :         }
     401             :     }
     402             : 
     403             : loop:
     404    35579612 :     while (targetBlock != InvalidBlockNumber)
     405             :     {
     406             :         /*
     407             :          * Read and exclusive-lock the target block, as well as the other
     408             :          * block if one was given, taking suitable care with lock ordering and
     409             :          * the possibility they are the same block.
     410             :          *
     411             :          * If the page-level all-visible flag is set, caller will need to
     412             :          * clear both that and the corresponding visibility map bit.  However,
     413             :          * by the time we return, we'll have x-locked the buffer, and we don't
     414             :          * want to do any I/O while in that state.  So we check the bit here
     415             :          * before taking the lock, and pin the page if it appears necessary.
     416             :          * Checking without the lock creates a risk of getting the wrong
     417             :          * answer, so we'll have to recheck after acquiring the lock.
     418             :          */
     419    17658998 :         if (otherBuffer == InvalidBuffer)
     420             :         {
     421             :             /* easy case */
     422    17558578 :             buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
     423    17558578 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     424        8092 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     425    17558578 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     426             :         }
     427      100420 :         else if (otherBlock == targetBlock)
     428             :         {
     429             :             /* also easy case */
     430        1872 :             buffer = otherBuffer;
     431        1872 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     432           0 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     433        1872 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     434             :         }
     435       98548 :         else if (otherBlock < targetBlock)
     436             :         {
     437             :             /* lock other buffer first */
     438       92024 :             buffer = ReadBuffer(relation, targetBlock);
     439       92024 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     440         352 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     441       92024 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     442       92024 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     443             :         }
     444             :         else
     445             :         {
     446             :             /* lock target buffer first */
     447        6524 :             buffer = ReadBuffer(relation, targetBlock);
     448        6524 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     449         264 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     450        6524 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     451        6524 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     452             :         }
     453             : 
     454             :         /*
     455             :          * We now have the target page (and the other buffer, if any) pinned
     456             :          * and locked.  However, since our initial PageIsAllVisible checks
     457             :          * were performed before acquiring the lock, the results might now be
     458             :          * out of date, either for the selected victim buffer, or for the
     459             :          * other buffer passed by the caller.  In that case, we'll need to
     460             :          * give up our locks, go get the pin(s) we failed to get earlier, and
     461             :          * re-lock.  That's pretty painful, but hopefully shouldn't happen
     462             :          * often.
     463             :          *
     464             :          * Note that there's a small possibility that we didn't pin the page
     465             :          * above but still have the correct page pinned anyway, either because
     466             :          * we've already made a previous pass through this loop, or because
     467             :          * caller passed us the right page anyway.
     468             :          *
     469             :          * Note also that it's possible that by the time we get the pin and
     470             :          * retake the buffer locks, the visibility map bit will have been
     471             :          * cleared by some other backend anyway.  In that case, we'll have
     472             :          * done a bit of extra work for no gain, but there's no real harm
     473             :          * done.
     474             :          */
     475    17658998 :         if (otherBuffer == InvalidBuffer || targetBlock <= otherBlock)
     476    17566974 :             GetVisibilityMapPins(relation, buffer, otherBuffer,
     477             :                                  targetBlock, otherBlock, vmbuffer,
     478             :                                  vmbuffer_other);
     479             :         else
     480       92024 :             GetVisibilityMapPins(relation, otherBuffer, buffer,
     481             :                                  otherBlock, targetBlock, vmbuffer_other,
     482             :                                  vmbuffer);
     483             : 
     484             :         /*
     485             :          * Now we can check to see if there's enough free space here. If so,
     486             :          * we're done.
     487             :          */
     488    17658998 :         page = BufferGetPage(buffer);
     489             : 
     490             :         /*
     491             :          * If necessary initialize page, it'll be used soon.  We could avoid
     492             :          * dirtying the buffer here, and rely on the caller to do so whenever
     493             :          * it puts a tuple onto the page, but there seems not much benefit in
     494             :          * doing so.
     495             :          */
     496    17658998 :         if (PageIsNew(page))
     497             :         {
     498        1610 :             PageInit(page, BufferGetPageSize(buffer), 0);
     499        1610 :             MarkBufferDirty(buffer);
     500             :         }
     501             : 
     502    17658998 :         pageFreeSpace = PageGetHeapFreeSpace(page);
     503    17658998 :         if (len + saveFreeSpace <= pageFreeSpace)
     504             :         {
     505             :             /* use this page as future insert target, too */
     506    17409358 :             RelationSetTargetBlock(relation, targetBlock);
     507    17409358 :             return buffer;
     508             :         }
     509             : 
     510             :         /*
     511             :          * Not enough space, so we must give up our page locks and pin (if
     512             :          * any) and prepare to look elsewhere.  We don't care which order we
     513             :          * unlock the two buffers in, so this can be slightly simpler than the
     514             :          * code above.
     515             :          */
     516      249640 :         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     517      249640 :         if (otherBuffer == InvalidBuffer)
     518      244418 :             ReleaseBuffer(buffer);
     519        5222 :         else if (otherBlock != targetBlock)
     520             :         {
     521        3350 :             LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
     522        3350 :             ReleaseBuffer(buffer);
     523             :         }
     524             : 
     525             :         /* Without FSM, always fall out of the loop and extend */
     526      249640 :         if (!use_fsm)
     527       12672 :             break;
     528             : 
     529             :         /*
     530             :          * Update FSM as to condition of this page, and ask for another page
     531             :          * to try.
     532             :          */
     533      236968 :         targetBlock = RecordAndGetPageWithFreeSpace(relation,
     534             :                                                     targetBlock,
     535             :                                                     pageFreeSpace,
     536             :                                                     len + saveFreeSpace);
     537             :     }
     538             : 
     539             :     /*
     540             :      * Have to extend the relation.
     541             :      *
     542             :      * We have to use a lock to ensure no one else is extending the rel at the
     543             :      * same time, else we will both try to initialize the same new page.  We
     544             :      * can skip locking for new or temp relations, however, since no one else
     545             :      * could be accessing them.
     546             :      */
     547      261964 :     needLock = !RELATION_IS_LOCAL(relation);
     548             : 
     549             :     /*
     550             :      * If we need the lock but are not able to acquire it immediately, we'll
     551             :      * consider extending the relation by multiple blocks at a time to manage
     552             :      * contention on the relation extension lock.  However, this only makes
     553             :      * sense if we're using the FSM; otherwise, there's no point.
     554             :      */
     555      261964 :     if (needLock)
     556             :     {
     557      139522 :         if (!use_fsm)
     558        3324 :             LockRelationForExtension(relation, ExclusiveLock);
     559      136198 :         else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
     560             :         {
     561             :             /* Couldn't get the lock immediately; wait for it. */
     562          92 :             LockRelationForExtension(relation, ExclusiveLock);
     563             : 
     564             :             /*
     565             :              * Check if some other backend has extended a block for us while
     566             :              * we were waiting on the lock.
     567             :              */
     568          92 :             targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
     569             : 
     570             :             /*
     571             :              * If some other waiter has already extended the relation, we
     572             :              * don't need to do so; just use the existing freespace.
     573             :              */
     574          92 :             if (targetBlock != InvalidBlockNumber)
     575             :             {
     576          28 :                 UnlockRelationForExtension(relation, ExclusiveLock);
     577          28 :                 goto loop;
     578             :             }
     579             : 
     580             :             /* Time to bulk-extend. */
     581          64 :             RelationAddExtraBlocks(relation, bistate);
     582             :         }
     583             :     }
     584             : 
     585             :     /*
     586             :      * In addition to whatever extension we performed above, we always add at
     587             :      * least one block to satisfy our own request.
     588             :      *
     589             :      * XXX This does an lseek - rather expensive - but at the moment it is the
     590             :      * only way to accurately determine how many blocks are in a relation.  Is
     591             :      * it worth keeping an accurate file length in shared memory someplace,
     592             :      * rather than relying on the kernel to do it for us?
     593             :      */
     594      261936 :     buffer = ReadBufferBI(relation, P_NEW, RBM_ZERO_AND_LOCK, bistate);
     595             : 
     596             :     /*
     597             :      * We need to initialize the empty new page.  Double-check that it really
     598             :      * is empty (this should never happen, but if it does we don't want to
     599             :      * risk wiping out valid data).
     600             :      */
     601      261936 :     page = BufferGetPage(buffer);
     602             : 
     603      261936 :     if (!PageIsNew(page))
     604           0 :         elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
     605             :              BufferGetBlockNumber(buffer),
     606             :              RelationGetRelationName(relation));
     607             : 
     608      261936 :     PageInit(page, BufferGetPageSize(buffer), 0);
     609      261936 :     MarkBufferDirty(buffer);
     610             : 
     611             :     /*
     612             :      * Release the file-extension lock; it's now OK for someone else to extend
     613             :      * the relation some more.
     614             :      */
     615      261936 :     if (needLock)
     616      139494 :         UnlockRelationForExtension(relation, ExclusiveLock);
     617             : 
     618             :     /*
     619             :      * Lock the other buffer. It's guaranteed to be of a lower page number
     620             :      * than the new page. To conform with the deadlock prevent rules, we ought
     621             :      * to lock otherBuffer first, but that would give other backends a chance
     622             :      * to put tuples on our page. To reduce the likelihood of that, attempt to
     623             :      * lock the other buffer conditionally, that's very likely to work.
     624             :      * Otherwise we need to lock buffers in the correct order, and retry if
     625             :      * the space has been used in the mean time.
     626             :      *
     627             :      * Alternatively, we could acquire the lock on otherBuffer before
     628             :      * extending the relation, but that'd require holding the lock while
     629             :      * performing IO, which seems worse than an unlikely retry.
     630             :      */
     631      261936 :     if (otherBuffer != InvalidBuffer)
     632             :     {
     633             :         Assert(otherBuffer != buffer);
     634             : 
     635        3928 :         if (unlikely(!ConditionalLockBuffer(otherBuffer)))
     636             :         {
     637           0 :             LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     638           0 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     639           0 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     640             : 
     641             :             /*
     642             :              * Because the buffer was unlocked for a while, it's possible,
     643             :              * although unlikely, that the page was filled. If so, just retry
     644             :              * from start.
     645             :              */
     646           0 :             if (len > PageGetHeapFreeSpace(page))
     647             :             {
     648           0 :                 LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
     649           0 :                 UnlockReleaseBuffer(buffer);
     650             : 
     651           0 :                 goto loop;
     652             :             }
     653             :         }
     654             :     }
     655             : 
     656      261936 :     if (len > PageGetHeapFreeSpace(page))
     657             :     {
     658             :         /* We should not get here given the test at the top */
     659           0 :         elog(PANIC, "tuple is too big: size %zu", len);
     660             :     }
     661             : 
     662             :     /*
     663             :      * Remember the new page as our target for future insertions.
     664             :      *
     665             :      * XXX should we enter the new page into the free space map immediately,
     666             :      * or just keep it for this backend's exclusive use in the short run
     667             :      * (until VACUUM sees it)?  Seems to depend on whether you expect the
     668             :      * current backend to make more insertions or not, which is probably a
     669             :      * good bet most of the time.  So for now, don't add it to FSM yet.
     670             :      */
     671      261936 :     RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
     672             : 
     673      261936 :     return buffer;
     674             : }

Generated by: LCOV version 1.13