LCOV - code coverage report
Current view: top level - src/backend/access/heap - hio.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 87.4 % 223 195
Test Date: 2026-03-01 09:14:34 Functions: 100.0 % 5 5
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * hio.c
       4              :  *    POSTGRES heap access method input/output code.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/access/heap/hio.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/heapam.h"
      19              : #include "access/hio.h"
      20              : #include "access/htup_details.h"
      21              : #include "access/visibilitymap.h"
      22              : #include "storage/bufmgr.h"
      23              : #include "storage/freespace.h"
      24              : #include "storage/lmgr.h"
      25              : 
      26              : 
      27              : /*
      28              :  * RelationPutHeapTuple - place tuple at specified page
      29              :  *
      30              :  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!  Must PANIC on failure!!!
      31              :  *
      32              :  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
      33              :  */
      34              : void
      35     10210086 : RelationPutHeapTuple(Relation relation,
      36              :                      Buffer buffer,
      37              :                      HeapTuple tuple,
      38              :                      bool token)
      39              : {
      40              :     Page        pageHeader;
      41              :     OffsetNumber offnum;
      42              : 
      43              :     /*
      44              :      * A tuple that's being inserted speculatively should already have its
      45              :      * token set.
      46              :      */
      47              :     Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
      48              : 
      49              :     /*
      50              :      * Do not allow tuples with invalid combinations of hint bits to be placed
      51              :      * on a page.  This combination is detected as corruption by the
      52              :      * contrib/amcheck logic, so if you disable this assertion, make
      53              :      * corresponding changes there.
      54              :      */
      55              :     Assert(!((tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED) &&
      56              :              (tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)));
      57              : 
      58              :     /* Add the tuple to the page */
      59     10210086 :     pageHeader = BufferGetPage(buffer);
      60              : 
      61     10210086 :     offnum = PageAddItem(pageHeader, tuple->t_data, tuple->t_len, InvalidOffsetNumber, false, true);
      62     10210086 :     if (offnum == InvalidOffsetNumber)
      63            0 :         elog(PANIC, "failed to add tuple to page");
      64              : 
      65              :     /* Update tuple->t_self to the actual position where it was stored */
      66     10210086 :     ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
      67              : 
      68              :     /*
      69              :      * Insert the correct position into CTID of the stored tuple, too (unless
      70              :      * this is a speculative insertion, in which case the token is held in
      71              :      * CTID field instead)
      72              :      */
      73     10210086 :     if (!token)
      74              :     {
      75     10207954 :         ItemId      itemId = PageGetItemId(pageHeader, offnum);
      76     10207954 :         HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);
      77              : 
      78     10207954 :         item->t_ctid = tuple->t_self;
      79              :     }
      80     10210086 : }
      81              : 
      82              : /*
      83              :  * Read in a buffer in mode, using bulk-insert strategy if bistate isn't NULL.
      84              :  */
      85              : static Buffer
      86      8791410 : ReadBufferBI(Relation relation, BlockNumber targetBlock,
      87              :              ReadBufferMode mode, BulkInsertState bistate)
      88              : {
      89              :     Buffer      buffer;
      90              : 
      91              :     /* If not bulk-insert, exactly like ReadBuffer */
      92      8791410 :     if (!bistate)
      93      7558418 :         return ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
      94              :                                   mode, NULL);
      95              : 
      96              :     /* If we have the desired block already pinned, re-pin and return it */
      97      1232992 :     if (bistate->current_buf != InvalidBuffer)
      98              :     {
      99      1202798 :         if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
     100              :         {
     101              :             /*
     102              :              * Currently the LOCK variants are only used for extending
     103              :              * relation, which should never reach this branch.
     104              :              */
     105              :             Assert(mode != RBM_ZERO_AND_LOCK &&
     106              :                    mode != RBM_ZERO_AND_CLEANUP_LOCK);
     107              : 
     108      1190345 :             IncrBufferRefCount(bistate->current_buf);
     109      1190345 :             return bistate->current_buf;
     110              :         }
     111              :         /* ... else drop the old buffer */
     112        12453 :         ReleaseBuffer(bistate->current_buf);
     113        12453 :         bistate->current_buf = InvalidBuffer;
     114              :     }
     115              : 
     116              :     /* Perform a read using the buffer strategy */
     117        42647 :     buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
     118              :                                 mode, bistate->strategy);
     119              : 
     120              :     /* Save the selected block as target for future inserts */
     121        42647 :     IncrBufferRefCount(buffer);
     122        42647 :     bistate->current_buf = buffer;
     123              : 
     124        42647 :     return buffer;
     125              : }
     126              : 
     127              : /*
     128              :  * For each heap page which is all-visible, acquire a pin on the appropriate
     129              :  * visibility map page, if we haven't already got one.
     130              :  *
     131              :  * To avoid complexity in the callers, either buffer1 or buffer2 may be
     132              :  * InvalidBuffer if only one buffer is involved. For the same reason, block2
     133              :  * may be smaller than block1.
     134              :  *
     135              :  * Returns whether buffer locks were temporarily released.
     136              :  */
     137              : static bool
     138      8947419 : GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
     139              :                      BlockNumber block1, BlockNumber block2,
     140              :                      Buffer *vmbuffer1, Buffer *vmbuffer2)
     141              : {
     142              :     bool        need_to_pin_buffer1;
     143              :     bool        need_to_pin_buffer2;
     144      8947419 :     bool        released_locks = false;
     145              : 
     146              :     /*
     147              :      * Swap buffers around to handle case of a single block/buffer, and to
     148              :      * handle if lock ordering rules require to lock block2 first.
     149              :      */
     150     17894151 :     if (!BufferIsValid(buffer1) ||
     151      9102054 :         (BufferIsValid(buffer2) && block1 > block2))
     152              :     {
     153       145864 :         Buffer      tmpbuf = buffer1;
     154       145864 :         Buffer     *tmpvmbuf = vmbuffer1;
     155       145864 :         BlockNumber tmpblock = block1;
     156              : 
     157       145864 :         buffer1 = buffer2;
     158       145864 :         vmbuffer1 = vmbuffer2;
     159       145864 :         block1 = block2;
     160              : 
     161       145864 :         buffer2 = tmpbuf;
     162       145864 :         vmbuffer2 = tmpvmbuf;
     163       145864 :         block2 = tmpblock;
     164              :     }
     165              : 
     166              :     Assert(BufferIsValid(buffer1));
     167              :     Assert(buffer2 == InvalidBuffer || block1 <= block2);
     168              : 
     169              :     while (1)
     170              :     {
     171              :         /* Figure out which pins we need but don't have. */
     172      8947419 :         need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
     173      8947419 :             && !visibilitymap_pin_ok(block1, *vmbuffer1);
     174      8947419 :         need_to_pin_buffer2 = buffer2 != InvalidBuffer
     175       155322 :             && PageIsAllVisible(BufferGetPage(buffer2))
     176      9102741 :             && !visibilitymap_pin_ok(block2, *vmbuffer2);
     177      8947419 :         if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
     178      8947419 :             break;
     179              : 
     180              :         /* We must unlock both buffers before doing any I/O. */
     181            0 :         released_locks = true;
     182            0 :         LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
     183            0 :         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
     184            0 :             LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
     185              : 
     186              :         /* Get pins. */
     187            0 :         if (need_to_pin_buffer1)
     188            0 :             visibilitymap_pin(relation, block1, vmbuffer1);
     189            0 :         if (need_to_pin_buffer2)
     190            0 :             visibilitymap_pin(relation, block2, vmbuffer2);
     191              : 
     192              :         /* Relock buffers. */
     193            0 :         LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
     194            0 :         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
     195            0 :             LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
     196              : 
     197              :         /*
     198              :          * If there are two buffers involved and we pinned just one of them,
     199              :          * it's possible that the second one became all-visible while we were
     200              :          * busy pinning the first one.  If it looks like that's a possible
     201              :          * scenario, we'll need to make a second pass through this loop.
     202              :          */
     203            0 :         if (buffer2 == InvalidBuffer || buffer1 == buffer2
     204            0 :             || (need_to_pin_buffer1 && need_to_pin_buffer2))
     205              :             break;
     206              :     }
     207              : 
     208      8947419 :     return released_locks;
     209              : }
     210              : 
     211              : /*
     212              :  * Extend the relation. By multiple pages, if beneficial.
     213              :  *
     214              :  * If the caller needs multiple pages (num_pages > 1), we always try to extend
     215              :  * by at least that much.
     216              :  *
     217              :  * If there is contention on the extension lock, we don't just extend "for
     218              :  * ourselves", but we try to help others. We can do so by adding empty pages
     219              :  * into the FSM. Typically there is no contention when we can't use the FSM.
     220              :  *
     221              :  * We do have to limit the number of pages to extend by to some value, as the
     222              :  * buffers for all the extended pages need to, temporarily, be pinned. For now
     223              :  * we define MAX_BUFFERS_TO_EXTEND_BY to be 64 buffers, it's hard to see
     224              :  * benefits with higher numbers. This partially is because copyfrom.c's
     225              :  * MAX_BUFFERED_TUPLES / MAX_BUFFERED_BYTES prevents larger multi_inserts.
     226              :  *
     227              :  * Returns a buffer for a newly extended block. If possible, the buffer is
     228              :  * returned exclusively locked. *did_unlock is set to true if the lock had to
     229              :  * be released, false otherwise.
     230              :  *
     231              :  *
     232              :  * XXX: It would likely be beneficial for some workloads to extend more
     233              :  * aggressively, e.g. using a heuristic based on the relation size.
     234              :  */
     235              : static Buffer
     236       114992 : RelationAddBlocks(Relation relation, BulkInsertState bistate,
     237              :                   int num_pages, bool use_fsm, bool *did_unlock)
     238              : {
     239              : #define MAX_BUFFERS_TO_EXTEND_BY 64
     240              :     Buffer      victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
     241       114992 :     BlockNumber first_block = InvalidBlockNumber;
     242       114992 :     BlockNumber last_block = InvalidBlockNumber;
     243              :     uint32      extend_by_pages;
     244              :     uint32      not_in_fsm_pages;
     245              :     Buffer      buffer;
     246              :     Page        page;
     247              : 
     248              :     /*
     249              :      * Determine by how many pages to try to extend by.
     250              :      */
     251       114992 :     if (bistate == NULL && !use_fsm)
     252              :     {
     253              :         /*
     254              :          * If we have neither bistate, nor can use the FSM, we can't bulk
     255              :          * extend - there'd be no way to find the additional pages.
     256              :          */
     257          172 :         extend_by_pages = 1;
     258              :     }
     259              :     else
     260              :     {
     261              :         uint32      waitcount;
     262              : 
     263              :         /*
     264              :          * Try to extend at least by the number of pages the caller needs. We
     265              :          * can remember the additional pages (either via FSM or bistate).
     266              :          */
     267       114820 :         extend_by_pages = num_pages;
     268              : 
     269       114820 :         if (!RELATION_IS_LOCAL(relation))
     270        67685 :             waitcount = RelationExtensionLockWaiterCount(relation);
     271              :         else
     272        47135 :             waitcount = 0;
     273              : 
     274              :         /*
     275              :          * Multiply the number of pages to extend by the number of waiters. Do
     276              :          * this even if we're not using the FSM, as it still relieves
     277              :          * contention, by deferring the next time this backend needs to
     278              :          * extend. In that case the extended pages will be found via
     279              :          * bistate->next_free.
     280              :          */
     281       114820 :         extend_by_pages += extend_by_pages * waitcount;
     282              : 
     283              :         /* ---
     284              :          * If we previously extended using the same bistate, it's very likely
     285              :          * we'll extend some more. Try to extend by as many pages as
     286              :          * before. This can be important for performance for several reasons,
     287              :          * including:
     288              :          *
     289              :          * - It prevents mdzeroextend() switching between extending the
     290              :          *   relation in different ways, which is inefficient for some
     291              :          *   filesystems.
     292              :          *
     293              :          * - Contention is often intermittent. Even if we currently don't see
     294              :          *   other waiters (see above), extending by larger amounts can
     295              :          *   prevent future contention.
     296              :          * ---
     297              :          */
     298       114820 :         if (bistate)
     299         6166 :             extend_by_pages = Max(extend_by_pages, bistate->already_extended_by);
     300              : 
     301              :         /*
     302              :          * Can't extend by more than MAX_BUFFERS_TO_EXTEND_BY, we need to pin
     303              :          * them all concurrently.
     304              :          */
     305       114820 :         extend_by_pages = Min(extend_by_pages, MAX_BUFFERS_TO_EXTEND_BY);
     306              :     }
     307              : 
     308              :     /*
     309              :      * How many of the extended pages should be entered into the FSM?
     310              :      *
     311              :      * If we have a bistate, only enter pages that we don't need ourselves
     312              :      * into the FSM.  Otherwise every other backend will immediately try to
     313              :      * use the pages this backend needs for itself, causing unnecessary
     314              :      * contention.  If we don't have a bistate, we can't avoid the FSM.
     315              :      *
     316              :      * Never enter the page returned into the FSM, we'll immediately use it.
     317              :      */
     318       114992 :     if (num_pages > 1 && bistate == NULL)
     319          280 :         not_in_fsm_pages = 1;
     320              :     else
     321       114712 :         not_in_fsm_pages = num_pages;
     322              : 
     323              :     /* prepare to put another buffer into the bistate */
     324       114992 :     if (bistate && bistate->current_buf != InvalidBuffer)
     325              :     {
     326         4310 :         ReleaseBuffer(bistate->current_buf);
     327         4310 :         bistate->current_buf = InvalidBuffer;
     328              :     }
     329              : 
     330              :     /*
     331              :      * Extend the relation. We ask for the first returned page to be locked,
     332              :      * so that we are sure that nobody has inserted into the page
     333              :      * concurrently.
     334              :      *
     335              :      * With the current MAX_BUFFERS_TO_EXTEND_BY there's no danger of
     336              :      * [auto]vacuum trying to truncate later pages as REL_TRUNCATE_MINIMUM is
     337              :      * way larger.
     338              :      */
     339       114992 :     first_block = ExtendBufferedRelBy(BMR_REL(relation), MAIN_FORKNUM,
     340              :                                       bistate ? bistate->strategy : NULL,
     341              :                                       EB_LOCK_FIRST,
     342              :                                       extend_by_pages,
     343              :                                       victim_buffers,
     344              :                                       &extend_by_pages);
     345       114992 :     buffer = victim_buffers[0]; /* the buffer the function will return */
     346       114992 :     last_block = first_block + (extend_by_pages - 1);
     347              :     Assert(first_block == BufferGetBlockNumber(buffer));
     348              : 
     349              :     /*
     350              :      * Relation is now extended. Initialize the page. We do this here, before
     351              :      * potentially releasing the lock on the page, because it allows us to
     352              :      * double check that the page contents are empty (this should never
     353              :      * happen, but if it does we don't want to risk wiping out valid data).
     354              :      */
     355       114992 :     page = BufferGetPage(buffer);
     356       114992 :     if (!PageIsNew(page))
     357            0 :         elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
     358              :              first_block,
     359              :              RelationGetRelationName(relation));
     360              : 
     361       114992 :     PageInit(page, BufferGetPageSize(buffer), 0);
     362       114992 :     MarkBufferDirty(buffer);
     363              : 
     364              :     /*
     365              :      * If we decided to put pages into the FSM, release the buffer lock (but
     366              :      * not pin), we don't want to do IO while holding a buffer lock. This will
     367              :      * necessitate a bit more extensive checking in our caller.
     368              :      */
     369       114992 :     if (use_fsm && not_in_fsm_pages < extend_by_pages)
     370              :     {
     371          413 :         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     372          413 :         *did_unlock = true;
     373              :     }
     374              :     else
     375       114579 :         *did_unlock = false;
     376              : 
     377              :     /*
     378              :      * Relation is now extended. Release pins on all buffers, except for the
     379              :      * first (which we'll return).  If we decided to put pages into the FSM,
     380              :      * we can do that as part of the same loop.
     381              :      */
     382       132222 :     for (uint32 i = 1; i < extend_by_pages; i++)
     383              :     {
     384        17230 :         BlockNumber curBlock = first_block + i;
     385              : 
     386              :         Assert(curBlock == BufferGetBlockNumber(victim_buffers[i]));
     387              :         Assert(BlockNumberIsValid(curBlock));
     388              : 
     389        17230 :         ReleaseBuffer(victim_buffers[i]);
     390              : 
     391        17230 :         if (use_fsm && i >= not_in_fsm_pages)
     392              :         {
     393         5343 :             Size        freespace = BufferGetPageSize(victim_buffers[i]) -
     394              :                 SizeOfPageHeaderData;
     395              : 
     396         5343 :             RecordPageWithFreeSpace(relation, curBlock, freespace);
     397              :         }
     398              :     }
     399              : 
     400       114992 :     if (use_fsm && not_in_fsm_pages < extend_by_pages)
     401              :     {
     402          413 :         BlockNumber first_fsm_block = first_block + not_in_fsm_pages;
     403              : 
     404          413 :         FreeSpaceMapVacuumRange(relation, first_fsm_block, last_block);
     405              :     }
     406              : 
     407       114992 :     if (bistate)
     408              :     {
     409              :         /*
     410              :          * Remember the additional pages we extended by, so we later can use
     411              :          * them without looking into the FSM.
     412              :          */
     413         6166 :         if (extend_by_pages > 1)
     414              :         {
     415          909 :             bistate->next_free = first_block + 1;
     416          909 :             bistate->last_free = last_block;
     417              :         }
     418              :         else
     419              :         {
     420         5257 :             bistate->next_free = InvalidBlockNumber;
     421         5257 :             bistate->last_free = InvalidBlockNumber;
     422              :         }
     423              : 
     424              :         /* maintain bistate->current_buf */
     425         6166 :         IncrBufferRefCount(buffer);
     426         6166 :         bistate->current_buf = buffer;
     427         6166 :         bistate->already_extended_by += extend_by_pages;
     428              :     }
     429              : 
     430       114992 :     return buffer;
     431              : #undef MAX_BUFFERS_TO_EXTEND_BY
     432              : }
     433              : 
     434              : /*
     435              :  * RelationGetBufferForTuple
     436              :  *
     437              :  *  Returns pinned and exclusive-locked buffer of a page in given relation
     438              :  *  with free space >= given len.
     439              :  *
     440              :  *  If num_pages is > 1, we will try to extend the relation by at least that
     441              :  *  many pages when we decide to extend the relation. This is more efficient
     442              :  *  for callers that know they will need multiple pages
     443              :  *  (e.g. heap_multi_insert()).
     444              :  *
     445              :  *  If otherBuffer is not InvalidBuffer, then it references a previously
     446              :  *  pinned buffer of another page in the same relation; on return, this
     447              :  *  buffer will also be exclusive-locked.  (This case is used by heap_update;
     448              :  *  the otherBuffer contains the tuple being updated.)
     449              :  *
     450              :  *  The reason for passing otherBuffer is that if two backends are doing
     451              :  *  concurrent heap_update operations, a deadlock could occur if they try
     452              :  *  to lock the same two buffers in opposite orders.  To ensure that this
     453              :  *  can't happen, we impose the rule that buffers of a relation must be
     454              :  *  locked in increasing page number order.  This is most conveniently done
     455              :  *  by having RelationGetBufferForTuple lock them both, with suitable care
     456              :  *  for ordering.
     457              :  *
     458              :  *  NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
     459              :  *  same buffer we select for insertion of the new tuple (this could only
     460              :  *  happen if space is freed in that page after heap_update finds there's not
     461              :  *  enough there).  In that case, the page will be pinned and locked only once.
     462              :  *
     463              :  *  We also handle the possibility that the all-visible flag will need to be
     464              :  *  cleared on one or both pages.  If so, pin on the associated visibility map
     465              :  *  page must be acquired before acquiring buffer lock(s), to avoid possibly
     466              :  *  doing I/O while holding buffer locks.  The pins are passed back to the
     467              :  *  caller using the input-output arguments vmbuffer and vmbuffer_other.
     468              :  *  Note that in some cases the caller might have already acquired such pins,
     469              :  *  which is indicated by these arguments not being InvalidBuffer on entry.
     470              :  *
     471              :  *  We normally use FSM to help us find free space.  However,
     472              :  *  if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
     473              :  *  the end of the relation if the tuple won't fit on the current target page.
     474              :  *  This can save some cycles when we know the relation is new and doesn't
     475              :  *  contain useful amounts of free space.
     476              :  *
     477              :  *  HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
     478              :  *  relation, if the caller holds exclusive lock and is careful to invalidate
     479              :  *  relation's smgr_targblock before the first insertion --- that ensures that
     480              :  *  all insertions will occur into newly added pages and not be intermixed
     481              :  *  with tuples from other transactions.  That way, a crash can't risk losing
     482              :  *  any committed data of other transactions.  (See heap_insert's comments
     483              :  *  for additional constraints needed for safe usage of this behavior.)
     484              :  *
     485              :  *  The caller can also provide a BulkInsertState object to optimize many
     486              :  *  insertions into the same relation.  This keeps a pin on the current
     487              :  *  insertion target page (to save pin/unpin cycles) and also passes a
     488              :  *  BULKWRITE buffer selection strategy object to the buffer manager.
     489              :  *  Passing NULL for bistate selects the default behavior.
     490              :  *
     491              :  *  We don't fill existing pages further than the fillfactor, except for large
     492              :  *  tuples in nearly-empty pages.  This is OK since this routine is not
     493              :  *  consulted when updating a tuple and keeping it on the same page, which is
     494              :  *  the scenario fillfactor is meant to reserve space for.
     495              :  *
     496              :  *  ereport(ERROR) is allowed here, so this routine *must* be called
     497              :  *  before any (unlogged) changes are made in buffer pool.
     498              :  */
     499              : Buffer
     500      8931387 : RelationGetBufferForTuple(Relation relation, Size len,
     501              :                           Buffer otherBuffer, int options,
     502              :                           BulkInsertState bistate,
     503              :                           Buffer *vmbuffer, Buffer *vmbuffer_other,
     504              :                           int num_pages)
     505              : {
     506      8931387 :     bool        use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
     507      8931387 :     Buffer      buffer = InvalidBuffer;
     508              :     Page        page;
     509              :     Size        nearlyEmptyFreeSpace,
     510      8931387 :                 pageFreeSpace = 0,
     511      8931387 :                 saveFreeSpace = 0,
     512      8931387 :                 targetFreeSpace = 0;
     513              :     BlockNumber targetBlock,
     514              :                 otherBlock;
     515              :     bool        unlockedTargetBuffer;
     516              :     bool        recheckVmPins;
     517              : 
     518      8931387 :     len = MAXALIGN(len);        /* be conservative */
     519              : 
     520              :     /* if the caller doesn't know by how many pages to extend, extend by 1 */
     521      8931387 :     if (num_pages <= 0)
     522      8541715 :         num_pages = 1;
     523              : 
     524              :     /* Bulk insert is not supported for updates, only inserts. */
     525              :     Assert(otherBuffer == InvalidBuffer || !bistate);
     526              : 
     527              :     /*
     528              :      * If we're gonna fail for oversize tuple, do it right away
     529              :      */
     530      8931387 :     if (len > MaxHeapTupleSize)
     531            0 :         ereport(ERROR,
     532              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     533              :                  errmsg("row is too big: size %zu, maximum size %zu",
     534              :                         len, MaxHeapTupleSize)));
     535              : 
     536              :     /* Compute desired extra freespace due to fillfactor option */
     537      8931387 :     saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
     538              :                                                    HEAP_DEFAULT_FILLFACTOR);
     539              : 
     540              :     /*
     541              :      * Since pages without tuples can still have line pointers, we consider
     542              :      * pages "empty" when the unavailable space is slight.  This threshold is
     543              :      * somewhat arbitrary, but it should prevent most unnecessary relation
     544              :      * extensions while inserting large tuples into low-fillfactor tables.
     545              :      */
     546      8931387 :     nearlyEmptyFreeSpace = MaxHeapTupleSize -
     547              :         (MaxHeapTuplesPerPage / 8 * sizeof(ItemIdData));
     548      8931387 :     if (len + saveFreeSpace > nearlyEmptyFreeSpace)
     549         3046 :         targetFreeSpace = Max(len, nearlyEmptyFreeSpace);
     550              :     else
     551      8928341 :         targetFreeSpace = len + saveFreeSpace;
     552              : 
     553      8931387 :     if (otherBuffer != InvalidBuffer)
     554       150197 :         otherBlock = BufferGetBlockNumber(otherBuffer);
     555              :     else
     556      8781190 :         otherBlock = InvalidBlockNumber;    /* just to keep compiler quiet */
     557              : 
     558              :     /*
     559              :      * We first try to put the tuple on the same page we last inserted a tuple
     560              :      * on, as cached in the BulkInsertState or relcache entry.  If that
     561              :      * doesn't work, we ask the Free Space Map to locate a suitable page.
     562              :      * Since the FSM's info might be out of date, we have to be prepared to
     563              :      * loop around and retry multiple times. (To ensure this isn't an infinite
     564              :      * loop, we must update the FSM with the correct amount of free space on
     565              :      * each page that proves not to be suitable.)  If the FSM has no record of
     566              :      * a page with enough free space, we give up and extend the relation.
     567              :      *
     568              :      * When use_fsm is false, we either put the tuple onto the existing target
     569              :      * page or extend the relation.
     570              :      */
     571      8931387 :     if (bistate && bistate->current_buf != InvalidBuffer)
     572      1190345 :         targetBlock = BufferGetBlockNumber(bistate->current_buf);
     573              :     else
     574      7741042 :         targetBlock = RelationGetTargetBlock(relation);
     575              : 
     576      8931387 :     if (targetBlock == InvalidBlockNumber && use_fsm)
     577              :     {
     578              :         /*
     579              :          * We have no cached target page, so ask the FSM for an initial
     580              :          * target.
     581              :          */
     582        49031 :         targetBlock = GetPageWithFreeSpace(relation, targetFreeSpace);
     583              :     }
     584              : 
     585              :     /*
     586              :      * If the FSM knows nothing of the rel, try the last page before we give
     587              :      * up and extend.  This avoids one-tuple-per-page syndrome during
     588              :      * bootstrapping or in a recently-started system.
     589              :      */
     590      8931387 :     if (targetBlock == InvalidBlockNumber)
     591              :     {
     592        38464 :         BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
     593              : 
     594        38464 :         if (nblocks > 0)
     595        22629 :             targetBlock = nblocks - 1;
     596              :     }
     597              : 
     598      8931387 : loop:
     599      9055525 :     while (targetBlock != InvalidBlockNumber)
     600              :     {
     601              :         /*
     602              :          * Read and exclusive-lock the target block, as well as the other
     603              :          * block if one was given, taking suitable care with lock ordering and
     604              :          * the possibility they are the same block.
     605              :          *
     606              :          * If the page-level all-visible flag is set, caller will need to
     607              :          * clear both that and the corresponding visibility map bit.  However,
     608              :          * by the time we return, we'll have x-locked the buffer, and we don't
     609              :          * want to do any I/O while in that state.  So we check the bit here
     610              :          * before taking the lock, and pin the page if it appears necessary.
     611              :          * Checking without the lock creates a risk of getting the wrong
     612              :          * answer, so we'll have to recheck after acquiring the lock.
     613              :          */
     614      8943656 :         if (otherBuffer == InvalidBuffer)
     615              :         {
     616              :             /* easy case */
     617      8791410 :             buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
     618      8791410 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     619        13274 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     620              : 
     621              :             /*
     622              :              * If the page is empty, pin vmbuffer to set all_frozen bit later.
     623              :              */
     624      8796532 :             if ((options & HEAP_INSERT_FROZEN) &&
     625         5122 :                 (PageGetMaxOffsetNumber(BufferGetPage(buffer)) == 0))
     626         1613 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     627              : 
     628      8791410 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     629              :         }
     630       152246 :         else if (otherBlock == targetBlock)
     631              :         {
     632              :             /* also easy case */
     633         1731 :             buffer = otherBuffer;
     634         1731 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     635            0 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     636         1731 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     637              :         }
     638       150515 :         else if (otherBlock < targetBlock)
     639              :         {
     640              :             /* lock other buffer first */
     641       145177 :             buffer = ReadBuffer(relation, targetBlock);
     642       145177 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     643          824 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     644       145177 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     645       145177 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     646              :         }
     647              :         else
     648              :         {
     649              :             /* lock target buffer first */
     650         5338 :             buffer = ReadBuffer(relation, targetBlock);
     651         5338 :             if (PageIsAllVisible(BufferGetPage(buffer)))
     652           98 :                 visibilitymap_pin(relation, targetBlock, vmbuffer);
     653         5338 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     654         5338 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     655              :         }
     656              : 
     657              :         /*
     658              :          * We now have the target page (and the other buffer, if any) pinned
     659              :          * and locked.  However, since our initial PageIsAllVisible checks
     660              :          * were performed before acquiring the lock, the results might now be
     661              :          * out of date, either for the selected victim buffer, or for the
     662              :          * other buffer passed by the caller.  In that case, we'll need to
     663              :          * give up our locks, go get the pin(s) we failed to get earlier, and
     664              :          * re-lock.  That's pretty painful, but hopefully shouldn't happen
     665              :          * often.
     666              :          *
     667              :          * Note that there's a small possibility that we didn't pin the page
     668              :          * above but still have the correct page pinned anyway, either because
     669              :          * we've already made a previous pass through this loop, or because
     670              :          * caller passed us the right page anyway.
     671              :          *
     672              :          * Note also that it's possible that by the time we get the pin and
     673              :          * retake the buffer locks, the visibility map bit will have been
     674              :          * cleared by some other backend anyway.  In that case, we'll have
     675              :          * done a bit of extra work for no gain, but there's no real harm
     676              :          * done.
     677              :          */
     678      8943656 :         GetVisibilityMapPins(relation, buffer, otherBuffer,
     679              :                              targetBlock, otherBlock, vmbuffer,
     680              :                              vmbuffer_other);
     681              : 
     682              :         /*
     683              :          * Now we can check to see if there's enough free space here. If so,
     684              :          * we're done.
     685              :          */
     686      8943656 :         page = BufferGetPage(buffer);
     687              : 
     688              :         /*
     689              :          * If necessary initialize page, it'll be used soon.  We could avoid
     690              :          * dirtying the buffer here, and rely on the caller to do so whenever
     691              :          * it puts a tuple onto the page, but there seems not much benefit in
     692              :          * doing so.
     693              :          */
     694      8943656 :         if (PageIsNew(page))
     695              :         {
     696        14008 :             PageInit(page, BufferGetPageSize(buffer), 0);
     697        14008 :             MarkBufferDirty(buffer);
     698              :         }
     699              : 
     700      8943656 :         pageFreeSpace = PageGetHeapFreeSpace(page);
     701      8943656 :         if (targetFreeSpace <= pageFreeSpace)
     702              :         {
     703              :             /* use this page as future insert target, too */
     704      8816395 :             RelationSetTargetBlock(relation, targetBlock);
     705      8816395 :             return buffer;
     706              :         }
     707              : 
     708              :         /*
     709              :          * Not enough space, so we must give up our page locks and pin (if
     710              :          * any) and prepare to look elsewhere.  We don't care which order we
     711              :          * unlock the two buffers in, so this can be slightly simpler than the
     712              :          * code above.
     713              :          */
     714       127261 :         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     715       127261 :         if (otherBuffer == InvalidBuffer)
     716       122136 :             ReleaseBuffer(buffer);
     717         5125 :         else if (otherBlock != targetBlock)
     718              :         {
     719         3394 :             LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
     720         3394 :             ReleaseBuffer(buffer);
     721              :         }
     722              : 
     723              :         /* Is there an ongoing bulk extension? */
     724       127261 :         if (bistate && bistate->next_free != InvalidBlockNumber)
     725              :         {
     726              :             Assert(bistate->next_free <= bistate->last_free);
     727              : 
     728              :             /*
     729              :              * We bulk extended the relation before, and there are still some
     730              :              * unused pages from that extension, so we don't need to look in
     731              :              * the FSM for a new page. But do record the free space from the
     732              :              * last page, somebody might insert narrower tuples later.
     733              :              */
     734        12313 :             if (use_fsm)
     735         4368 :                 RecordPageWithFreeSpace(relation, targetBlock, pageFreeSpace);
     736              : 
     737        12313 :             targetBlock = bistate->next_free;
     738        12313 :             if (bistate->next_free >= bistate->last_free)
     739              :             {
     740          730 :                 bistate->next_free = InvalidBlockNumber;
     741          730 :                 bistate->last_free = InvalidBlockNumber;
     742              :             }
     743              :             else
     744        11583 :                 bistate->next_free++;
     745              :         }
     746       114948 :         else if (!use_fsm)
     747              :         {
     748              :             /* Without FSM, always fall out of the loop and extend */
     749         3123 :             break;
     750              :         }
     751              :         else
     752              :         {
     753              :             /*
     754              :              * Update FSM as to condition of this page, and ask for another
     755              :              * page to try.
     756              :              */
     757       111825 :             targetBlock = RecordAndGetPageWithFreeSpace(relation,
     758              :                                                         targetBlock,
     759              :                                                         pageFreeSpace,
     760              :                                                         targetFreeSpace);
     761              :         }
     762              :     }
     763              : 
     764              :     /* Have to extend the relation */
     765       114992 :     buffer = RelationAddBlocks(relation, bistate, num_pages, use_fsm,
     766              :                                &unlockedTargetBuffer);
     767              : 
     768       114992 :     targetBlock = BufferGetBlockNumber(buffer);
     769       114992 :     page = BufferGetPage(buffer);
     770              : 
     771              :     /*
     772              :      * The page is empty, pin vmbuffer to set all_frozen bit. We don't want to
     773              :      * do IO while the buffer is locked, so we unlock the page first if IO is
     774              :      * needed (necessitating checks below).
     775              :      */
     776       114992 :     if (options & HEAP_INSERT_FROZEN)
     777              :     {
     778              :         Assert(PageGetMaxOffsetNumber(page) == 0);
     779              : 
     780          303 :         if (!visibilitymap_pin_ok(targetBlock, *vmbuffer))
     781              :         {
     782          276 :             if (!unlockedTargetBuffer)
     783          276 :                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     784          276 :             unlockedTargetBuffer = true;
     785          276 :             visibilitymap_pin(relation, targetBlock, vmbuffer);
     786              :         }
     787              :     }
     788              : 
     789              :     /*
     790              :      * Reacquire locks if necessary.
     791              :      *
     792              :      * If the target buffer was unlocked above, or is unlocked while
     793              :      * reacquiring the lock on otherBuffer below, it's unlikely, but possible,
     794              :      * that another backend used space on this page. We check for that below,
     795              :      * and retry if necessary.
     796              :      */
     797       114992 :     recheckVmPins = false;
     798       114992 :     if (unlockedTargetBuffer)
     799              :     {
     800              :         /* released lock on target buffer above */
     801          689 :         if (otherBuffer != InvalidBuffer)
     802            2 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     803          689 :         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     804          689 :         recheckVmPins = true;
     805              :     }
     806       114303 :     else if (otherBuffer != InvalidBuffer)
     807              :     {
     808              :         /*
     809              :          * We did not release the target buffer, and otherBuffer is valid,
     810              :          * need to lock the other buffer. It's guaranteed to be of a lower
     811              :          * page number than the new page.  To conform with the deadlock
     812              :          * prevent rules, we ought to lock otherBuffer first, but that would
     813              :          * give other backends a chance to put tuples on our page. To reduce
     814              :          * the likelihood of that, attempt to lock the other buffer
     815              :          * conditionally, that's very likely to work.
     816              :          *
     817              :          * Alternatively, we could acquire the lock on otherBuffer before
     818              :          * extending the relation, but that'd require holding the lock while
     819              :          * performing IO, which seems worse than an unlikely retry.
     820              :          */
     821              :         Assert(otherBuffer != buffer);
     822              :         Assert(targetBlock > otherBlock);
     823              : 
     824         3074 :         if (unlikely(!ConditionalLockBuffer(otherBuffer)))
     825              :         {
     826            0 :             unlockedTargetBuffer = true;
     827            0 :             LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     828            0 :             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
     829            0 :             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     830              :         }
     831         3074 :         recheckVmPins = true;
     832              :     }
     833              : 
     834              :     /*
     835              :      * If one of the buffers was unlocked (always the case if otherBuffer is
     836              :      * valid), it's possible, although unlikely, that an all-visible flag
     837              :      * became set.  We can use GetVisibilityMapPins to deal with that. It's
     838              :      * possible that GetVisibilityMapPins() might need to temporarily release
     839              :      * buffer locks, in which case we'll need to check if there's still enough
     840              :      * space on the page below.
     841              :      */
     842       114992 :     if (recheckVmPins)
     843              :     {
     844         3763 :         if (GetVisibilityMapPins(relation, otherBuffer, buffer,
     845              :                                  otherBlock, targetBlock, vmbuffer_other,
     846              :                                  vmbuffer))
     847            0 :             unlockedTargetBuffer = true;
     848              :     }
     849              : 
     850              :     /*
     851              :      * If the target buffer was temporarily unlocked since the relation
     852              :      * extension, it's possible, although unlikely, that all the space on the
     853              :      * page was already used. If so, we just retry from the start.  If we
     854              :      * didn't unlock, something has gone wrong if there's not enough space -
     855              :      * the test at the top should have prevented reaching this case.
     856              :      */
     857       114992 :     pageFreeSpace = PageGetHeapFreeSpace(page);
     858       114992 :     if (len > pageFreeSpace)
     859              :     {
     860            0 :         if (unlockedTargetBuffer)
     861              :         {
     862            0 :             if (otherBuffer != InvalidBuffer)
     863            0 :                 LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
     864            0 :             UnlockReleaseBuffer(buffer);
     865              : 
     866            0 :             goto loop;
     867              :         }
     868            0 :         elog(PANIC, "tuple is too big: size %zu", len);
     869              :     }
     870              : 
     871              :     /*
     872              :      * Remember the new page as our target for future insertions.
     873              :      *
     874              :      * XXX should we enter the new page into the free space map immediately,
     875              :      * or just keep it for this backend's exclusive use in the short run
     876              :      * (until VACUUM sees it)?  Seems to depend on whether you expect the
     877              :      * current backend to make more insertions or not, which is probably a
     878              :      * good bet most of the time.  So for now, don't add it to FSM yet.
     879              :      */
     880       114992 :     RelationSetTargetBlock(relation, targetBlock);
     881              : 
     882       114992 :     return buffer;
     883              : }
        

Generated by: LCOV version 2.0-1