LCOV - code coverage report
Current view: top level - src/backend/access/heap - hio.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 194 223 87.0 %
Date: 2024-04-26 10:11:36 Functions: 5 5 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * hio.c
       4             :  *    POSTGRES heap access method input/output code.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/heap/hio.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/heapam.h"
      19             : #include "access/hio.h"
      20             : #include "access/htup_details.h"
      21             : #include "access/visibilitymap.h"
      22             : #include "storage/bufmgr.h"
      23             : #include "storage/freespace.h"
      24             : #include "storage/lmgr.h"
      25             : 
      26             : 
      27             : /*
      28             :  * RelationPutHeapTuple - place tuple at specified page
      29             :  *
      30             :  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!  Must PANIC on failure!!!
      31             :  *
      32             :  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
      33             :  */
      34             : void
      35    18403076 : RelationPutHeapTuple(Relation relation,
      36             :                      Buffer buffer,
      37             :                      HeapTuple tuple,
      38             :                      bool token)
      39             : {
      40             :     Page        pageHeader;
      41             :     OffsetNumber offnum;
      42             : 
      43             :     /*
      44             :      * A tuple that's being inserted speculatively should already have its
      45             :      * token set.
      46             :      */
      47             :     Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
      48             : 
      49             :     /*
      50             :      * Do not allow tuples with invalid combinations of hint bits to be placed
      51             :      * on a page.  This combination is detected as corruption by the
      52             :      * contrib/amcheck logic, so if you disable this assertion, make
      53             :      * corresponding changes there.
      54             :      */
      55             :     Assert(!((tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED) &&
      56             :              (tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)));
      57             : 
      58             :     /* Add the tuple to the page */
      59    18403076 :     pageHeader = BufferGetPage(buffer);
      60             : 
      61    18403076 :     offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
      62             :                          tuple->t_len, InvalidOffsetNumber, false, true);
      63             : 
      64    18403076 :     if (offnum == InvalidOffsetNumber)
      65           0 :         elog(PANIC, "failed to add tuple to page");
      66             : 
      67             :     /* Update tuple->t_self to the actual position where it was stored */
      68    18403076 :     ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
      69             : 
      70             :     /*
      71             :      * Insert the correct position into CTID of the stored tuple, too (unless
      72             :      * this is a speculative insertion, in which case the token is held in
      73             :      * CTID field instead)
      74             :      */
      75    18403076 :     if (!token)
      76             :     {
      77    18399050 :         ItemId      itemId = PageGetItemId(pageHeader, offnum);
      78    18399050 :         HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);
      79             : 
      80    18399050 :         item->t_ctid = tuple->t_self;
      81             :     }
      82    18403076 : }
      83             : 
      84             : /*
      85             :  * Read in a buffer in mode, using bulk-insert strategy if bistate isn't NULL.
      86             :  */
      87             : static Buffer
      88    15771638 : ReadBufferBI(Relation relation, BlockNumber targetBlock,
      89             :              ReadBufferMode mode, BulkInsertState bistate)
      90             : {
      91             :     Buffer      buffer;
      92             : 
      93             :     /* If not bulk-insert, exactly like ReadBuffer */
      94    15771638 :     if (!bistate)
      95    13450854 :         return ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
      96             :                                   mode, NULL);
      97             : 
      98             :     /* If we have the desired block already pinned, re-pin and return it */
      99     2320784 :     if (bistate->current_buf != InvalidBuffer)
     100             :     {
     101     2260436 :         if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
     102             :         {
     103             :             /*
     104             :              * Currently the LOCK variants are only used for extending
     105             :              * relation, which should never reach this branch.
     106             :              */
     107             :             Assert(mode != RBM_ZERO_AND_LOCK &&
     108             :                    mode != RBM_ZERO_AND_CLEANUP_LOCK);
     109             : 
     110     2236186 :             IncrBufferRefCount(bistate->current_buf);
     111     2236186 :             return bistate->current_buf;
     112             :         }
     113             :         /* ... else drop the old buffer */
     114       24250 :         ReleaseBuffer(bistate->current_buf);
     115       24250 :         bistate->current_buf = InvalidBuffer;
     116             :     }
     117             : 
     118             :     /* Perform a read using the buffer strategy */
     119       84598 :     buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
     120             :                                 mode, bistate->strategy);
     121             : 
     122             :     /* Save the selected block as target for future inserts */
     123       84598 :     IncrBufferRefCount(buffer);
     124       84598 :     bistate->current_buf = buffer;
     125             : 
     126       84598 :     return buffer;
     127             : }
     128             : 
     129             : /*
     130             :  * For each heap page which is all-visible, acquire a pin on the appropriate
     131             :  * visibility map page, if we haven't already got one.
     132             :  *
     133             :  * To avoid complexity in the callers, either buffer1 or buffer2 may be
     134             :  * InvalidBuffer if only one buffer is involved. For the same reason, block2
     135             :  * may be smaller than block1.
     136             :  *
     137             :  * Returns whether buffer locks were temporarily released.
     138             :  */
     139             : static bool
     140    16062594 : GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
     141             :                      BlockNumber block1, BlockNumber block2,
     142             :                      Buffer *vmbuffer1, Buffer *vmbuffer2)
     143             : {
     144             :     bool        need_to_pin_buffer1;
     145             :     bool        need_to_pin_buffer2;
     146    16062594 :     bool        released_locks = false;
     147             : 
     148             :     /*
     149             :      * Swap buffers around to handle case of a single block/buffer, and to
     150             :      * handle if lock ordering rules require to lock block2 first.
     151             :      */
     152    32124188 :     if (!BufferIsValid(buffer1) ||
     153    16351550 :         (BufferIsValid(buffer2) && block1 > block2))
     154             :     {
     155      276306 :         Buffer      tmpbuf = buffer1;
     156      276306 :         Buffer     *tmpvmbuf = vmbuffer1;
     157      276306 :         BlockNumber tmpblock = block1;
     158             : 
     159      276306 :         buffer1 = buffer2;
     160      276306 :         vmbuffer1 = vmbuffer2;
     161      276306 :         block1 = block2;
     162             : 
     163      276306 :         buffer2 = tmpbuf;
     164      276306 :         vmbuffer2 = tmpvmbuf;
     165      276306 :         block2 = tmpblock;
     166             :     }
     167             : 
     168             :     Assert(BufferIsValid(buffer1));
     169             :     Assert(buffer2 == InvalidBuffer || block1 <= block2);
     170             : 
     171             :     while (1)
     172             :     {
     173             :         /* Figure out which pins we need but don't have. */
     174    16062594 :         need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
     175    16062594 :             && !visibilitymap_pin_ok(block1, *vmbuffer1);
     176    16062594 :         need_to_pin_buffer2 = buffer2 != InvalidBuffer
     177      289956 :             && PageIsAllVisible(BufferGetPage(buffer2))
     178    16352550 :             && !visibilitymap_pin_ok(block2, *vmbuffer2);
     179    16062594 :         if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
     180    16062594 :             break;
     181             : 
     182             :         /* We must unlock both buffers before doing any I/O. */
     183           0 :         released_locks = true;
     184           0 :         LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
     185           0 :         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
     186           0 :             LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
     187             : 
     188             :         /* Get pins. */
     189           0 :         if (need_to_pin_buffer1)
     190           0 :             visibilitymap_pin(relation, block1, vmbuffer1);
     191           0 :         if (need_to_pin_buffer2)
     192           0 :             visibilitymap_pin(relation, block2, vmbuffer2);
     193             : 
     194             :         /* Relock buffers. */
     195           0 :         LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
     196           0 :         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
     197           0 :             LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
     198             : 
     199             :         /*
     200             :          * If there are two buffers involved and we pinned just one of them,
     201             :          * it's possible that the second one became all-visible while we were
     202             :          * busy pinning the first one.  If it looks like that's a possible
     203             :          * scenario, we'll need to make a second pass through this loop.
     204             :          */
     205           0 :         if (buffer2 == InvalidBuffer || buffer1 == buffer2
     206           0 :             || (need_to_pin_buffer1 && need_to_pin_buffer2))
     207             :             break;
     208             :     }
     209             : 
     210    16062594 :     return released_locks;
     211             : }
     212             : 
     213             : /*
     214             :  * Extend the relation. By multiple pages, if beneficial.
     215             :  *
     216             :  * If the caller needs multiple pages (num_pages > 1), we always try to extend
     217             :  * by at least that much.
     218             :  *
     219             :  * If there is contention on the extension lock, we don't just extend "for
     220             :  * ourselves", but we try to help others. We can do so by adding empty pages
     221             :  * into the FSM. Typically there is no contention when we can't use the FSM.
     222             :  *
     223             :  * We do have to limit the number of pages to extend by to some value, as the
     224             :  * buffers for all the extended pages need to, temporarily, be pinned. For now
     225             :  * we define MAX_BUFFERS_TO_EXTEND_BY to be 64 buffers, it's hard to see
     226             :  * benefits with higher numbers. This partially is because copyfrom.c's
     227             :  * MAX_BUFFERED_TUPLES / MAX_BUFFERED_BYTES prevents larger multi_inserts.
     228             :  *
     229             :  * Returns a buffer for a newly extended block. If possible, the buffer is
     230             :  * returned exclusively locked. *did_unlock is set to true if the lock had to
     231             :  * be released, false otherwise.
     232             :  *
     233             :  *
     234             :  * XXX: It would likely be beneficial for some workloads to extend more
     235             :  * aggressively, e.g. using a heuristic based on the relation size.
     236             :  */
     237             : static Buffer
     238      190896 : RelationAddBlocks(Relation relation, BulkInsertState bistate,
     239             :                   int num_pages, bool use_fsm, bool *did_unlock)
     240             : {
     241             : #define MAX_BUFFERS_TO_EXTEND_BY 64
     242             :     Buffer      victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
     243      190896 :     BlockNumber first_block = InvalidBlockNumber;
     244      190896 :     BlockNumber last_block = InvalidBlockNumber;
     245             :     uint32      extend_by_pages;
     246             :     uint32      not_in_fsm_pages;
     247             :     Buffer      buffer;
     248             :     Page        page;
     249             : 
     250             :     /*
     251             :      * Determine by how many pages to try to extend by.
     252             :      */
     253      190896 :     if (bistate == NULL && !use_fsm)
     254             :     {
     255             :         /*
     256             :          * If we have neither bistate, nor can use the FSM, we can't bulk
     257             :          * extend - there'd be no way to find the additional pages.
     258             :          */
     259         326 :         extend_by_pages = 1;
     260             :     }
     261             :     else
     262             :     {
     263             :         uint32      waitcount;
     264             : 
     265             :         /*
     266             :          * Try to extend at least by the number of pages the caller needs. We
     267             :          * can remember the additional pages (either via FSM or bistate).
     268             :          */
     269      190570 :         extend_by_pages = num_pages;
     270             : 
     271      190570 :         if (!RELATION_IS_LOCAL(relation))
     272      112994 :             waitcount = RelationExtensionLockWaiterCount(relation);
     273             :         else
     274       77576 :             waitcount = 0;
     275             : 
     276             :         /*
     277             :          * Multiply the number of pages to extend by the number of waiters. Do
     278             :          * this even if we're not using the FSM, as it still relieves
     279             :          * contention, by deferring the next time this backend needs to
     280             :          * extend. In that case the extended pages will be found via
     281             :          * bistate->next_free.
     282             :          */
     283      190570 :         extend_by_pages += extend_by_pages * waitcount;
     284             : 
     285             :         /* ---
     286             :          * If we previously extended using the same bistate, it's very likely
     287             :          * we'll extend some more. Try to extend by as many pages as
     288             :          * before. This can be important for performance for several reasons,
     289             :          * including:
     290             :          *
     291             :          * - It prevents mdzeroextend() switching between extending the
     292             :          *   relation in different ways, which is inefficient for some
     293             :          *   filesystems.
     294             :          *
     295             :          * - Contention is often intermittent. Even if we currently don't see
     296             :          *   other waiters (see above), extending by larger amounts can
     297             :          *   prevent future contention.
     298             :          * ---
     299             :          */
     300      190570 :         if (bistate)
     301       11426 :             extend_by_pages = Max(extend_by_pages, bistate->already_extended_by);
     302             : 
     303             :         /*
     304             :          * Can't extend by more than MAX_BUFFERS_TO_EXTEND_BY, we need to pin
     305             :          * them all concurrently.
     306             :          */
     307      190570 :         extend_by_pages = Min(extend_by_pages, MAX_BUFFERS_TO_EXTEND_BY);
     308             :     }
     309             : 
     310             :     /*
     311             :      * How many of the extended pages should be entered into the FSM?
     312             :      *
     313             :      * If we have a bistate, only enter pages that we don't need ourselves
     314             :      * into the FSM.  Otherwise every other backend will immediately try to
     315             :      * use the pages this backend needs for itself, causing unnecessary
     316             :      * contention.  If we don't have a bistate, we can't avoid the FSM.
     317             :      *
     318             :      * Never enter the page returned into the FSM, we'll immediately use it.
     319             :      */
     320      190896 :     if (num_pages > 1 && bistate == NULL)
     321         506 :         not_in_fsm_pages = 1;
     322             :     else
     323      190390 :         not_in_fsm_pages = num_pages;
     324             : 
     325             :     /* prepare to put another buffer into the bistate */
     326      190896 :     if (bistate && bistate->current_buf != InvalidBuffer)
     327             :     {
     328        8226 :         ReleaseBuffer(bistate->current_buf);
     329        8226 :         bistate->current_buf = InvalidBuffer;
     330             :     }
     331             : 
     332             :     /*
     333             :      * Extend the relation. We ask for the first returned page to be locked,
     334             :      * so that we are sure that nobody has inserted into the page
     335             :      * concurrently.
     336             :      *
     337             :      * With the current MAX_BUFFERS_TO_EXTEND_BY there's no danger of
     338             :      * [auto]vacuum trying to truncate later pages as REL_TRUNCATE_MINIMUM is
     339             :      * way larger.
     340             :      */
     341      190896 :     first_block = ExtendBufferedRelBy(BMR_REL(relation), MAIN_FORKNUM,
     342             :                                       bistate ? bistate->strategy : NULL,
     343             :                                       EB_LOCK_FIRST,
     344             :                                       extend_by_pages,
     345             :                                       victim_buffers,
     346             :                                       &extend_by_pages);
     347      190896 :     buffer = victim_buffers[0]; /* the buffer the function will return */
     348      190896 :     last_block = first_block + (extend_by_pages - 1);
     349             :     Assert(first_block == BufferGetBlockNumber(buffer));
     350             : 
     351             :     /*
     352             :      * Relation is now extended. Initialize the page. We do this here, before
     353             :      * potentially releasing the lock on the page, because it allows us to
     354             :      * double check that the page contents are empty (this should never
     355             :      * happen, but if it does we don't want to risk wiping out valid data).
     356             :      */
     357      190896 :     page = BufferGetPage(buffer);
     358      190896 :     if (!PageIsNew(page))
     359           0 :         elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
     360             :              first_block,
     361             :              RelationGetRelationName(relation));
     362             : 
     363      190896 :     PageInit(page, BufferGetPageSize(buffer), 0);
     364      190896 :     MarkBufferDirty(buffer);
     365             : 
     366             :     /*
     367             :      * If we decided to put pages into the FSM, release the buffer lock (but
     368             :      * not pin), we don't want to do IO while holding a buffer lock. This will
     369             :      * necessitate a bit more extensive checking in our caller.
     370             :      */
     371      190896 :     if (use_fsm && not_in_fsm_pages < extend_by_pages)
     372             :     {
     373         756 :         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     374         756 :         *did_unlock = true;
     375             :     }
     376             :     else
     377      190140 :         *did_unlock = false;
     378             : 
     379             :     /*
     380             :      * Relation is now extended. Release pins on all buffers, except for the
     381             :      * first (which we'll return).  If we decided to put pages into the FSM,
     382             :      * we can do that as part of the same loop.
     383             :      */
     384      224454 :     for (uint32 i = 1; i < extend_by_pages; i++)
     385             :     {
     386       33558 :         BlockNumber curBlock = first_block + i;
     387             : 
     388             :         Assert(curBlock == BufferGetBlockNumber(victim_buffers[i]));
     389             :         Assert(BlockNumberIsValid(curBlock));
     390             : 
     391       33558 :         ReleaseBuffer(victim_buffers[i]);
     392             : 
     393       33558 :         if (use_fsm && i >= not_in_fsm_pages)
     394             :         {
     395       10518 :             Size        freespace = BufferGetPageSize(victim_buffers[i]) -
     396             :                 SizeOfPageHeaderData;
     397             : 
     398       10518 :             RecordPageWithFreeSpace(relation, curBlock, freespace);
     399             :         }
     400             :     }
     401             : 
     402      190896 :     if (use_fsm && not_in_fsm_pages < extend_by_pages)
     403             :     {
     404         756 :         BlockNumber first_fsm_block = first_block + not_in_fsm_pages;
     405             : 
     406         756 :         FreeSpaceMapVacuumRange(relation, first_fsm_block, last_block);
     407             :     }
     408             : 
     409      190896 :     if (bistate)
     410             :     {
     411             :         /*
     412             :          * Remember the additional pages we extended by, so we later can use
     413             :          * them without looking into the FSM.
     414             :          */
     415       11426 :         if (extend_by_pages > 1)
     416             :         {
     417        1740 :             bistate->next_free = first_block + 1;
     418        1740 :             bistate->last_free = last_block;
     419             :         }
     420             :         else
     421             :         {
     422        9686 :             bistate->next_free = InvalidBlockNumber;
     423        9686 :             bistate->last_free = InvalidBlockNumber;
     424             :         }
     425             : 
     426             :         /* maintain bistate->current_buf */
     427       11426 :         IncrBufferRefCount(buffer);
     428       11426 :         bistate->current_buf = buffer;
     429       11426 :         bistate->already_extended_by += extend_by_pages;
     430             :     }
     431             : 
     432      190896 :     return buffer;
     433             : #undef MAX_BUFFERS_TO_EXTEND_BY
     434             : }
     435             : 
     436             : /*
     437             :  * RelationGetBufferForTuple
     438             :  *
     439             :  *  Returns pinned and exclusive-locked buffer of a page in given relation
     440             :  *  with free space >= given len.
     441             :  *
     442             :  *  If num_pages is > 1, we will try to extend the relation by at least that
     443             :  *  many pages when we decide to extend the relation. This is more efficient
     444             :  *  for callers that know they will need multiple pages
     445             :  *  (e.g. heap_multi_insert()).
     446             :  *
     447             :  *  If otherBuffer is not InvalidBuffer, then it references a previously
     448             :  *  pinned buffer of another page in the same relation; on return, this
     449             :  *  buffer will also be exclusive-locked.  (This case is used by heap_update;
     450             :  *  the otherBuffer contains the tuple being updated.)
     451             :  *
     452             :  *  The reason for passing otherBuffer is that if two backends are doing
     453             :  *  concurrent heap_update operations, a deadlock could occur if they try
     454             :  *  to lock the same two buffers in opposite orders.  To ensure that this
     455             :  *  can't happen, we impose the rule that buffers of a relation must be
     456             :  *  locked in increasing page number order.  This is most conveniently done
     457             :  *  by having RelationGetBufferForTuple lock them both, with suitable care
     458             :  *  for ordering.
     459             :  *
     460             :  *  NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
     461             :  *  same buffer we select for insertion of the new tuple (this could only
     462             :  *  happen if space is freed in that page after heap_update finds there's not
     463             :  *  enough there).  In that case, the page will be pinned and locked only once.
     464             :  *
     465             :  *  We also handle the possibility that the all-visible flag will need to be
     466             :  *  cleared on one or both pages.  If so, pin on the associated visibility map
     467             :  *  page must be acquired before acquiring buffer lock(s), to avoid possibly
     468             :  *  doing I/O while holding buffer locks.  The pins are passed back to the
     469             :  *  caller using the input-output arguments vmbuffer and vmbuffer_other.
     470             :  *  Note that in some cases the caller might have already acquired such pins,
     471             :  *  which is indicated by these arguments not being InvalidBuffer on entry.
     472             :  *
     473             :  *  We normally use FSM to help us find free space.  However,
     474             :  *  if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
     475             :  *  the end of the relation if the tuple won't fit on the current target page.
     476             :  *  This can save some cycles when we know the relation is new and doesn't
     477             :  *  contain useful amounts of free space.
     478             :  *
     479             :  *  HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
     480             :  *  relation, if the caller holds exclusive lock and is careful to invalidate
     481             :  *  relation's smgr_targblock before the first insertion --- that ensures that
     482             :  *  all insertions will occur into newly added pages and not be intermixed
     483             :  *  with tuples from other transactions.  That way, a crash can't risk losing
     484             :  *  any committed data of other transactions.  (See heap_insert's comments
     485             :  *  for additional constraints needed for safe usage of this behavior.)
     486             :  *
     487             :  *  The caller can also provide a BulkInsertState object to optimize many
     488             :  *  insertions into the same relation.  This keeps a pin on the current
     489             :  *  insertion target page (to save pin/unpin cycles) and also passes a
     490             :  *  BULKWRITE buffer selection strategy object to the buffer manager.
     491             :  *  Passing NULL for bistate selects the default behavior.
     492             :  *
     493             :  *  We don't fill existing pages further than the fillfactor, except for large
     494             :  *  tuples in nearly-empty pages.  This is OK since this routine is not
     495             :  *  consulted when updating a tuple and keeping it on the same page, which is
     496             :  *  the scenario fillfactor is meant to reserve space for.
     497             :  *
     498             :  *  ereport(ERROR) is allowed here, so this routine *must* be called
     499             :  *  before any (unlogged) changes are made in buffer pool.
     500             :  */
     501             : Buffer
     502    16035474 : RelationGetBufferForTuple(Relation relation, Size len,
     503             :                           Buffer otherBuffer, int options,
     504             :                           BulkInsertState bistate,
     505             :                           Buffer *vmbuffer, Buffer *vmbuffer_other,
     506             :                           int num_pages)
     507             : {
     508    16035474 :     bool        use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
     509    16035474 :     Buffer      buffer = InvalidBuffer;
     510             :     Page        page;
     511             :     Size        nearlyEmptyFreeSpace,
     512    16035474 :                 pageFreeSpace = 0,
     513    16035474 :                 saveFreeSpace = 0,
     514    16035474 :                 targetFreeSpace = 0;
     515             :     BlockNumber targetBlock,
     516             :                 otherBlock;
     517             :     bool        unlockedTargetBuffer;
     518             :     bool        recheckVmPins;
     519             : 
     520    16035474 :     len = MAXALIGN(len);        /* be conservative */
     521             : 
     522             :     /* if the caller doesn't know by how many pages to extend, extend by 1 */
     523    16035474 :     if (num_pages <= 0)
     524    15396744 :         num_pages = 1;
     525             : 
     526             :     /* Bulk insert is not supported for updates, only inserts. */
     527             :     Assert(otherBuffer == InvalidBuffer || !bistate);
     528             : 
     529             :     /*
     530             :      * If we're gonna fail for oversize tuple, do it right away
     531             :      */
     532    16035474 :     if (len > MaxHeapTupleSize)
     533           0 :         ereport(ERROR,
     534             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     535             :                  errmsg("row is too big: size %zu, maximum size %zu",
     536             :                         len, MaxHeapTupleSize)));
     537             : 
     538             :     /* Compute desired extra freespace due to fillfactor option */
     539    16035474 :     saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
     540             :                                                    HEAP_DEFAULT_FILLFACTOR);
     541             : 
     542             :     /*
     543             :      * Since pages without tuples can still have line pointers, we consider
     544             :      * pages "empty" when the unavailable space is slight.  This threshold is
     545             :      * somewhat arbitrary, but it should prevent most unnecessary relation
     546             :      * extensions while inserting large tuples into low-fillfactor tables.
     547             :      */
     548    16035474 :     nearlyEmptyFreeSpace = MaxHeapTupleSize -
     549             :         (MaxHeapTuplesPerPage / 8 * sizeof(ItemIdData));
     550    16035474 :     if (len + saveFreeSpace > nearlyEmptyFreeSpace)
     551          72 :         targetFreeSpace = Max(len, nearlyEmptyFreeSpace);
     552             :     else
     553    16035402 :         targetFreeSpace = len + saveFreeSpace;
     554             : 
     555    16035474 :     if (otherBuffer != InvalidBuffer)
     556      281040 :         otherBlock = BufferGetBlockNumber(otherBuffer);
     557             :     else
     558    15754434 :         otherBlock = InvalidBlockNumber;    /* just to keep compiler quiet */
     559             : 
     560             :     /*
     561             :      * We first try to put the tuple on the same page we last inserted a tuple
     562             :      * on, as cached in the BulkInsertState or relcache entry.  If that
     563             :      * doesn't work, we ask the Free Space Map to locate a suitable page.
     564             :      * Since the FSM's info might be out of date, we have to be prepared to
     565             :      * loop around and retry multiple times. (To ensure this isn't an infinite
     566             :      * loop, we must update the FSM with the correct amount of free space on
     567             :      * each page that proves not to be suitable.)  If the FSM has no record of
     568             :      * a page with enough free space, we give up and extend the relation.
     569             :      *
     570             :      * When use_fsm is false, we either put the tuple onto the existing target
     571             :      * page or extend the relation.
     572             :      */
     573    16035474 :     if (bistate && bistate->current_buf != InvalidBuffer)
     574     2236186 :         targetBlock = BufferGetBlockNumber(bistate->current_buf);
     575             :     else
     576    13799288 :         targetBlock = RelationGetTargetBlock(relation);
     577             : 
     578    16035474 :     if (targetBlock == InvalidBlockNumber && use_fsm)
     579             :     {
     580             :         /*
     581             :          * We have no cached target page, so ask the FSM for an initial
     582             :          * target.
     583             :          */
     584       82534 :         targetBlock = GetPageWithFreeSpace(relation, targetFreeSpace);
     585             :     }
     586             : 
     587             :     /*
     588             :      * If the FSM knows nothing of the rel, try the last page before we give
     589             :      * up and extend.  This avoids one-tuple-per-page syndrome during
     590             :      * bootstrapping or in a recently-started system.
     591             :      */
     592    16035474 :     if (targetBlock == InvalidBlockNumber)
     593             :     {
     594       67380 :         BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
     595             : 
     596       67380 :         if (nblocks > 0)
     597       39662 :             targetBlock = nblocks - 1;
     598             :     }
     599             : 
     600    16035474 : loop:
     601    16240692 :     while (targetBlock != InvalidBlockNumber)
     602             :     {
     603             :         /*
     604             :          * Read and exclusive-lock the target block, as well as the other
     605             :          * block if one was given, taking suitable care with lock ordering and
     606             :          * the possibility they are the same block.
     607             :          *
     608             :          * If the page-level all-visible flag is set, caller will need to
     609             :          * clear both that and the corresponding visibility map bit.  However,
     610             :          * by the time we return, we'll have x-locked the buffer, and we don't
     611             :          * want to do any I/O while in that state.  So we check the bit here
     612             :          * before taking the lock, and pin the page if it appears necessary.
     613             :          * Checking without the lock creates a risk of getting the wrong
     614             :          * answer, so we'll have to recheck after acquiring the lock.
     615             :          */
     616    16055632 :         if (otherBuffer == InvalidBuffer)
     617             :         {
     618             :             /* easy case */
     619    15771638 :             buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
     620    15771638 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     621       19414 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     622             : 
     623             :             /*
     624             :              * If the page is empty, pin vmbuffer to set all_frozen bit later.
     625             :              */
     626    15778732 :             if ((options & HEAP_INSERT_FROZEN) &&
     627        7094 :                 (PageGetMaxOffsetNumber(BufferGetPage(buffer)) == 0))
     628        3226 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     629             : 
     630    15771638 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     631             :         }
     632      283994 :         else if (otherBlock == targetBlock)
     633             :         {
     634             :             /* also easy case */
     635        2922 :             buffer = otherBuffer;
     636        2922 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     637           0 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     638        2922 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     639             :         }
     640      281072 :         else if (otherBlock < targetBlock)
     641             :         {
     642             :             /* lock other buffer first */
     643      275306 :             buffer = ReadBuffer(relation, targetBlock);
     644      275306 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     645         930 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     646      275306 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     647      275306 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     648             :         }
     649             :         else
     650             :         {
     651             :             /* lock target buffer first */
     652        5766 :             buffer = ReadBuffer(relation, targetBlock);
     653        5766 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     654          92 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     655        5766 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     656        5766 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     657             :         }
     658             : 
     659             :         /*
     660             :          * We now have the target page (and the other buffer, if any) pinned
     661             :          * and locked.  However, since our initial PageIsAllVisible checks
     662             :          * were performed before acquiring the lock, the results might now be
     663             :          * out of date, either for the selected victim buffer, or for the
     664             :          * other buffer passed by the caller.  In that case, we'll need to
     665             :          * give up our locks, go get the pin(s) we failed to get earlier, and
     666             :          * re-lock.  That's pretty painful, but hopefully shouldn't happen
     667             :          * often.
     668             :          *
     669             :          * Note that there's a small possibility that we didn't pin the page
     670             :          * above but still have the correct page pinned anyway, either because
     671             :          * we've already made a previous pass through this loop, or because
     672             :          * caller passed us the right page anyway.
     673             :          *
     674             :          * Note also that it's possible that by the time we get the pin and
     675             :          * retake the buffer locks, the visibility map bit will have been
     676             :          * cleared by some other backend anyway.  In that case, we'll have
     677             :          * done a bit of extra work for no gain, but there's no real harm
     678             :          * done.
     679             :          */
     680    16055632 :         GetVisibilityMapPins(relation, buffer, otherBuffer,
     681             :                              targetBlock, otherBlock, vmbuffer,
     682             :                              vmbuffer_other);
     683             : 
     684             :         /*
     685             :          * Now we can check to see if there's enough free space here. If so,
     686             :          * we're done.
     687             :          */
     688    16055632 :         page = BufferGetPage(buffer);
     689             : 
     690             :         /*
     691             :          * If necessary initialize page, it'll be used soon.  We could avoid
     692             :          * dirtying the buffer here, and rely on the caller to do so whenever
     693             :          * it puts a tuple onto the page, but there seems not much benefit in
     694             :          * doing so.
     695             :          */
     696    16055632 :         if (PageIsNew(page))
     697             :         {
     698       27190 :             PageInit(page, BufferGetPageSize(buffer), 0);
     699       27190 :             MarkBufferDirty(buffer);
     700             :         }
     701             : 
     702    16055632 :         pageFreeSpace = PageGetHeapFreeSpace(page);
     703    16055632 :         if (targetFreeSpace <= pageFreeSpace)
     704             :         {
     705             :             /* use this page as future insert target, too */
     706    15844578 :             RelationSetTargetBlock(relation, targetBlock);
     707    15844578 :             return buffer;
     708             :         }
     709             : 
     710             :         /*
     711             :          * Not enough space, so we must give up our page locks and pin (if
     712             :          * any) and prepare to look elsewhere.  We don't care which order we
     713             :          * unlock the two buffers in, so this can be slightly simpler than the
     714             :          * code above.
     715             :          */
     716      211054 :         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     717      211054 :         if (otherBuffer == InvalidBuffer)
     718      202138 :             ReleaseBuffer(buffer);
     719        8916 :         else if (otherBlock != targetBlock)
     720             :         {
     721        5994 :             LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
     722        5994 :             ReleaseBuffer(buffer);
     723             :         }
     724             : 
     725             :         /* Is there an ongoing bulk extension? */
     726      211054 :         if (bistate && bistate->next_free != InvalidBlockNumber)
     727             :         {
     728             :             Assert(bistate->next_free <= bistate->last_free);
     729             : 
     730             :             /*
     731             :              * We bulk extended the relation before, and there are still some
     732             :              * unused pages from that extension, so we don't need to look in
     733             :              * the FSM for a new page. But do record the free space from the
     734             :              * last page, somebody might insert narrower tuples later.
     735             :              */
     736       23970 :             if (use_fsm)
     737        8580 :                 RecordPageWithFreeSpace(relation, targetBlock, pageFreeSpace);
     738             : 
     739       23970 :             targetBlock = bistate->next_free;
     740       23970 :             if (bistate->next_free >= bistate->last_free)
     741             :             {
     742        1390 :                 bistate->next_free = InvalidBlockNumber;
     743        1390 :                 bistate->last_free = InvalidBlockNumber;
     744             :             }
     745             :             else
     746       22580 :                 bistate->next_free++;
     747             :         }
     748      187084 :         else if (!use_fsm)
     749             :         {
     750             :             /* Without FSM, always fall out of the loop and extend */
     751        5836 :             break;
     752             :         }
     753             :         else
     754             :         {
     755             :             /*
     756             :              * Update FSM as to condition of this page, and ask for another
     757             :              * page to try.
     758             :              */
     759      181248 :             targetBlock = RecordAndGetPageWithFreeSpace(relation,
     760             :                                                         targetBlock,
     761             :                                                         pageFreeSpace,
     762             :                                                         targetFreeSpace);
     763             :         }
     764             :     }
     765             : 
     766             :     /* Have to extend the relation */
     767      190896 :     buffer = RelationAddBlocks(relation, bistate, num_pages, use_fsm,
     768             :                                &unlockedTargetBuffer);
     769             : 
     770      190896 :     targetBlock = BufferGetBlockNumber(buffer);
     771      190896 :     page = BufferGetPage(buffer);
     772             : 
     773             :     /*
     774             :      * The page is empty, pin vmbuffer to set all_frozen bit. We don't want to
     775             :      * do IO while the buffer is locked, so we unlock the page first if IO is
     776             :      * needed (necessitating checks below).
     777             :      */
     778      190896 :     if (options & HEAP_INSERT_FROZEN)
     779             :     {
     780             :         Assert(PageGetMaxOffsetNumber(page) == 0);
     781             : 
     782         298 :         if (!visibilitymap_pin_ok(targetBlock, *vmbuffer))
     783             :         {
     784         244 :             if (!unlockedTargetBuffer)
     785         244 :                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     786         244 :             unlockedTargetBuffer = true;
     787         244 :             visibilitymap_pin(relation, targetBlock, vmbuffer);
     788             :         }
     789             :     }
     790             : 
     791             :     /*
     792             :      * Reacquire locks if necessary.
     793             :      *
     794             :      * If the target buffer was unlocked above, or is unlocked while
     795             :      * reacquiring the lock on otherBuffer below, it's unlikely, but possible,
     796             :      * that another backend used space on this page. We check for that below,
     797             :      * and retry if necessary.
     798             :      */
     799      190896 :     recheckVmPins = false;
     800      190896 :     if (unlockedTargetBuffer)
     801             :     {
     802             :         /* released lock on target buffer above */
     803        1000 :         if (otherBuffer != InvalidBuffer)
     804           0 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     805        1000 :         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     806        1000 :         recheckVmPins = true;
     807             :     }
     808      189896 :     else if (otherBuffer != InvalidBuffer)
     809             :     {
     810             :         /*
     811             :          * We did not release the target buffer, and otherBuffer is valid,
     812             :          * need to lock the other buffer. It's guaranteed to be of a lower
     813             :          * page number than the new page.  To conform with the deadlock
     814             :          * prevent rules, we ought to lock otherBuffer first, but that would
     815             :          * give other backends a chance to put tuples on our page. To reduce
     816             :          * the likelihood of that, attempt to lock the other buffer
     817             :          * conditionally, that's very likely to work.
     818             :          *
     819             :          * Alternatively, we could acquire the lock on otherBuffer before
     820             :          * extending the relation, but that'd require holding the lock while
     821             :          * performing IO, which seems worse than an unlikely retry.
     822             :          */
     823             :         Assert(otherBuffer != buffer);
     824             :         Assert(targetBlock > otherBlock);
     825             : 
     826        5962 :         if (unlikely(!ConditionalLockBuffer(otherBuffer)))
     827             :         {
     828           0 :             unlockedTargetBuffer = true;
     829           0 :             LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     830           0 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     831           0 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     832             :         }
     833        5962 :         recheckVmPins = true;
     834             :     }
     835             : 
     836             :     /*
     837             :      * If one of the buffers was unlocked (always the case if otherBuffer is
     838             :      * valid), it's possible, although unlikely, that an all-visible flag
     839             :      * became set.  We can use GetVisibilityMapPins to deal with that. It's
     840             :      * possible that GetVisibilityMapPins() might need to temporarily release
     841             :      * buffer locks, in which case we'll need to check if there's still enough
     842             :      * space on the page below.
     843             :      */
     844      190896 :     if (recheckVmPins)
     845             :     {
     846        6962 :         if (GetVisibilityMapPins(relation, otherBuffer, buffer,
     847             :                                  otherBlock, targetBlock, vmbuffer_other,
     848             :                                  vmbuffer))
     849           0 :             unlockedTargetBuffer = true;
     850             :     }
     851             : 
     852             :     /*
     853             :      * If the target buffer was temporarily unlocked since the relation
     854             :      * extension, it's possible, although unlikely, that all the space on the
     855             :      * page was already used. If so, we just retry from the start.  If we
     856             :      * didn't unlock, something has gone wrong if there's not enough space -
     857             :      * the test at the top should have prevented reaching this case.
     858             :      */
     859      190896 :     pageFreeSpace = PageGetHeapFreeSpace(page);
     860      190896 :     if (len > pageFreeSpace)
     861             :     {
     862           0 :         if (unlockedTargetBuffer)
     863             :         {
     864           0 :             if (otherBuffer != InvalidBuffer)
     865           0 :                 LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
     866           0 :             UnlockReleaseBuffer(buffer);
     867             : 
     868           0 :             goto loop;
     869             :         }
     870           0 :         elog(PANIC, "tuple is too big: size %zu", len);
     871             :     }
     872             : 
     873             :     /*
     874             :      * Remember the new page as our target for future insertions.
     875             :      *
     876             :      * XXX should we enter the new page into the free space map immediately,
     877             :      * or just keep it for this backend's exclusive use in the short run
     878             :      * (until VACUUM sees it)?  Seems to depend on whether you expect the
     879             :      * current backend to make more insertions or not, which is probably a
     880             :      * good bet most of the time.  So for now, don't add it to FSM yet.
     881             :      */
     882      190896 :     RelationSetTargetBlock(relation, targetBlock);
     883             : 
     884      190896 :     return buffer;
     885             : }

Generated by: LCOV version 1.14