LCOV - code coverage report
Current view: top level - src/backend/storage/smgr - bulk_write.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 100.0 % 70 70
Test Date: 2026-02-17 17:20:33 Functions: 100.0 % 7 7
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * bulk_write.c
       4              :  *    Efficiently and reliably populate a new relation
       5              :  *
       6              :  * The assumption is that no other backends access the relation while we are
       7              :  * loading it, so we can take some shortcuts.  Pages already present in the
       8              :  * indicated fork when the bulk write operation is started are not modified
       9              :  * unless explicitly written to.  Do not mix operations through the regular
      10              :  * buffer manager and the bulk loading interface!
      11              :  *
      12              :  * We bypass the buffer manager to avoid the locking overhead, and call
      13              :  * smgrextend() directly.  A downside is that the pages will need to be
      14              :  * re-read into shared buffers on first use after the build finishes.  That's
      15              :  * usually a good tradeoff for large relations, and for small relations, the
      16              :  * overhead isn't very significant compared to creating the relation in the
      17              :  * first place.
      18              :  *
      19              :  * The pages are WAL-logged if needed.  To save on WAL header overhead, we
      20              :  * WAL-log several pages in one record.
      21              :  *
      22              :  * One tricky point is that because we bypass the buffer manager, we need to
      23              :  * register the relation for fsyncing at the next checkpoint ourselves, and
      24              :  * make sure that the relation is correctly fsync'd by us or the checkpointer
      25              :  * even if a checkpoint happens concurrently.
      26              :  *
      27              :  *
      28              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      29              :  * Portions Copyright (c) 1994, Regents of the University of California
      30              :  *
      31              :  *
      32              :  * IDENTIFICATION
      33              :  *    src/backend/storage/smgr/bulk_write.c
      34              :  *
      35              :  *-------------------------------------------------------------------------
      36              :  */
      37              : #include "postgres.h"
      38              : 
      39              : #include "access/xloginsert.h"
      40              : #include "access/xlogrecord.h"
      41              : #include "storage/bufpage.h"
      42              : #include "storage/bulk_write.h"
      43              : #include "storage/proc.h"
      44              : #include "storage/smgr.h"
      45              : #include "utils/rel.h"
      46              : 
      47              : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
      48              : 
      49              : static const PGIOAlignedBlock zero_buffer = {0};    /* worth BLCKSZ */
      50              : 
      51              : typedef struct PendingWrite
      52              : {
      53              :     BulkWriteBuffer buf;
      54              :     BlockNumber blkno;
      55              :     bool        page_std;
      56              : } PendingWrite;
      57              : 
      58              : /*
      59              :  * Bulk writer state for one relation fork.
      60              :  */
      61              : struct BulkWriteState
      62              : {
      63              :     /* Information about the target relation we're writing */
      64              :     SMgrRelation smgr;
      65              :     ForkNumber  forknum;
      66              :     bool        use_wal;
      67              : 
      68              :     /* We keep several writes queued, and WAL-log them in batches */
      69              :     int         npending;
      70              :     PendingWrite pending_writes[MAX_PENDING_WRITES];
      71              : 
      72              :     /* Current size of the relation */
      73              :     BlockNumber relsize;
      74              : 
      75              :     /* The RedoRecPtr at the time that the bulk operation started */
      76              :     XLogRecPtr  start_RedoRecPtr;
      77              : 
      78              :     MemoryContext memcxt;
      79              : };
      80              : 
      81              : static void smgr_bulk_flush(BulkWriteState *bulkstate);
      82              : 
      83              : /*
      84              :  * Start a bulk write operation on a relation fork.
      85              :  */
      86              : BulkWriteState *
      87        26911 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
      88              : {
      89        26911 :     return smgr_bulk_start_smgr(RelationGetSmgr(rel),
      90              :                                 forknum,
      91        26911 :                                 RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
      92              : }
      93              : 
      94              : /*
      95              :  * Start a bulk write operation on a relation fork.
      96              :  *
      97              :  * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
      98              :  */
      99              : BulkWriteState *
     100        27000 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
     101              : {
     102              :     BulkWriteState *state;
     103              : 
     104        27000 :     state = palloc_object(BulkWriteState);
     105        27000 :     state->smgr = smgr;
     106        27000 :     state->forknum = forknum;
     107        27000 :     state->use_wal = use_wal;
     108              : 
     109        27000 :     state->npending = 0;
     110        27000 :     state->relsize = smgrnblocks(smgr, forknum);
     111              : 
     112        27000 :     state->start_RedoRecPtr = GetRedoRecPtr();
     113              : 
     114              :     /*
     115              :      * Remember the memory context.  We will use it to allocate all the
     116              :      * buffers later.
     117              :      */
     118        27000 :     state->memcxt = CurrentMemoryContext;
     119              : 
     120        27000 :     return state;
     121              : }
     122              : 
     123              : /*
     124              :  * Finish bulk write operation.
     125              :  *
     126              :  * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
     127              :  * the relation if needed.
     128              :  */
     129              : void
     130        27000 : smgr_bulk_finish(BulkWriteState *bulkstate)
     131              : {
     132              :     /* WAL-log and flush any remaining pages */
     133        27000 :     smgr_bulk_flush(bulkstate);
     134              : 
     135              :     /*
     136              :      * Fsync the relation, or register it for the next checkpoint, if
     137              :      * necessary.
     138              :      */
     139        27000 :     if (SmgrIsTemp(bulkstate->smgr))
     140              :     {
     141              :         /* Temporary relations don't need to be fsync'd, ever */
     142              :     }
     143        25762 :     else if (!bulkstate->use_wal)
     144              :     {
     145              :         /*----------
     146              :          * This is either an unlogged relation, or a permanent relation but we
     147              :          * skipped WAL-logging because wal_level=minimal:
     148              :          *
     149              :          * A) Unlogged relation
     150              :          *
     151              :          *    Unlogged relations will go away on crash, but they need to be
     152              :          *    fsync'd on a clean shutdown. It's sufficient to call
     153              :          *    smgrregistersync(), that ensures that the checkpointer will
     154              :          *    flush it at the shutdown checkpoint. (It will flush it on the
     155              :          *    next online checkpoint too, which is not strictly necessary.)
     156              :          *
     157              :          *    Note that the init-fork of an unlogged relation is not
     158              :          *    considered unlogged for our purposes. It's treated like a
     159              :          *    regular permanent relation. The callers will pass use_wal=true
     160              :          *    for the init fork.
     161              :          *
     162              :          * B) Permanent relation, WAL-logging skipped because wal_level=minimal
     163              :          *
     164              :          *    This is a new relation, and we didn't WAL-log the pages as we
     165              :          *    wrote, but they need to be fsync'd before commit.
     166              :          *
     167              :          *    We don't need to do that here, however. The fsync() is done at
     168              :          *    commit, by smgrDoPendingSyncs() (*).
     169              :          *
     170              :          *    (*) smgrDoPendingSyncs() might decide to WAL-log the whole
     171              :          *    relation at commit instead of fsyncing it, if the relation was
     172              :          *    very small, but it's smgrDoPendingSyncs() responsibility in any
     173              :          *    case.
     174              :          *
     175              :          * We cannot distinguish the two here, so conservatively assume it's
     176              :          * an unlogged relation. A permanent relation with wal_level=minimal
     177              :          * would require no actions, see above.
     178              :          */
     179         2839 :         smgrregistersync(bulkstate->smgr, bulkstate->forknum);
     180              :     }
     181              :     else
     182              :     {
     183              :         /*
     184              :          * Permanent relation, WAL-logged normally.
     185              :          *
     186              :          * We already WAL-logged all the pages, so they will be replayed from
     187              :          * WAL on crash. However, when we wrote out the pages, we passed
     188              :          * skipFsync=true to avoid the overhead of registering all the writes
     189              :          * with the checkpointer.  Register the whole relation now.
     190              :          *
     191              :          * There is one hole in that idea: If a checkpoint occurred while we
     192              :          * were writing the pages, it already missed fsyncing the pages we had
     193              :          * written before the checkpoint started.  A crash later on would
     194              :          * replay the WAL starting from the checkpoint, therefore it wouldn't
     195              :          * replay our earlier WAL records.  So if a checkpoint started after
     196              :          * the bulk write, fsync the files now.
     197              :          */
     198              : 
     199              :         /*
     200              :          * Prevent a checkpoint from starting between the GetRedoRecPtr() and
     201              :          * smgrregistersync() calls.
     202              :          */
     203              :         Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
     204        22923 :         MyProc->delayChkptFlags |= DELAY_CHKPT_START;
     205              : 
     206        22923 :         if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
     207              :         {
     208              :             /*
     209              :              * A checkpoint occurred and it didn't know about our writes, so
     210              :              * fsync() the relation ourselves.
     211              :              */
     212            3 :             MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
     213            3 :             smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
     214            3 :             elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
     215              :         }
     216              :         else
     217              :         {
     218        22920 :             smgrregistersync(bulkstate->smgr, bulkstate->forknum);
     219        22920 :             MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
     220              :         }
     221              :     }
     222        27000 : }
     223              : 
     224              : static int
     225       119411 : buffer_cmp(const void *a, const void *b)
     226              : {
     227       119411 :     const PendingWrite *bufa = (const PendingWrite *) a;
     228       119411 :     const PendingWrite *bufb = (const PendingWrite *) b;
     229              : 
     230              :     /* We should not see duplicated writes for the same block */
     231              :     Assert(bufa->blkno != bufb->blkno);
     232       119411 :     if (bufa->blkno > bufb->blkno)
     233        56599 :         return 1;
     234              :     else
     235        62812 :         return -1;
     236              : }
     237              : 
     238              : /*
     239              :  * Finish all the pending writes.
     240              :  */
     241              : static void
     242        27574 : smgr_bulk_flush(BulkWriteState *bulkstate)
     243              : {
     244        27574 :     int         npending = bulkstate->npending;
     245        27574 :     PendingWrite *pending_writes = bulkstate->pending_writes;
     246              : 
     247        27574 :     if (npending == 0)
     248          101 :         return;
     249              : 
     250        27473 :     if (npending > 1)
     251         5931 :         qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
     252              : 
     253        27473 :     if (bulkstate->use_wal)
     254              :     {
     255              :         BlockNumber blknos[MAX_PENDING_WRITES];
     256              :         Page        pages[MAX_PENDING_WRITES];
     257        23198 :         bool        page_std = true;
     258              : 
     259        68095 :         for (int i = 0; i < npending; i++)
     260              :         {
     261        44897 :             blknos[i] = pending_writes[i].blkno;
     262        44897 :             pages[i] = pending_writes[i].buf->data;
     263              : 
     264              :             /*
     265              :              * If any of the pages use !page_std, we log them all as such.
     266              :              * That's a bit wasteful, but in practice, a mix of standard and
     267              :              * non-standard page layout is rare.  None of the built-in AMs do
     268              :              * that.
     269              :              */
     270        44897 :             if (!pending_writes[i].page_std)
     271           77 :                 page_std = false;
     272              :         }
     273        23198 :         log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
     274              :                      npending, blknos, pages, page_std);
     275              :     }
     276              : 
     277        87768 :     for (int i = 0; i < npending; i++)
     278              :     {
     279        60295 :         BlockNumber blkno = pending_writes[i].blkno;
     280        60295 :         Page        page = pending_writes[i].buf->data;
     281              : 
     282        60295 :         PageSetChecksumInplace(page, blkno);
     283              : 
     284        60295 :         if (blkno >= bulkstate->relsize)
     285              :         {
     286              :             /*
     287              :              * If we have to write pages nonsequentially, fill in the space
     288              :              * with zeroes until we come back and overwrite.  This is not
     289              :              * logically necessary on standard Unix filesystems (unwritten
     290              :              * space will read as zeroes anyway), but it should help to avoid
     291              :              * fragmentation.  The dummy pages aren't WAL-logged though.
     292              :              */
     293        60295 :             while (blkno > bulkstate->relsize)
     294              :             {
     295              :                 /* don't set checksum for all-zero page */
     296          275 :                 smgrextend(bulkstate->smgr, bulkstate->forknum,
     297              :                            bulkstate->relsize,
     298              :                            &zero_buffer,
     299              :                            true);
     300          275 :                 bulkstate->relsize++;
     301              :             }
     302              : 
     303        60020 :             smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
     304        60020 :             bulkstate->relsize++;
     305              :         }
     306              :         else
     307          275 :             smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
     308        60295 :         pfree(page);
     309              :     }
     310              : 
     311        27473 :     bulkstate->npending = 0;
     312              : }
     313              : 
     314              : /*
     315              :  * Queue write of 'buf'.
     316              :  *
     317              :  * NB: this takes ownership of 'buf'!
     318              :  *
     319              :  * You are only allowed to write a given block once as part of one bulk write
     320              :  * operation.
     321              :  */
     322              : void
     323        60295 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
     324              : {
     325              :     PendingWrite *w;
     326              : 
     327        60295 :     w = &bulkstate->pending_writes[bulkstate->npending++];
     328        60295 :     w->buf = buf;
     329        60295 :     w->blkno = blocknum;
     330        60295 :     w->page_std = page_std;
     331              : 
     332        60295 :     if (bulkstate->npending == MAX_PENDING_WRITES)
     333          574 :         smgr_bulk_flush(bulkstate);
     334        60295 : }
     335              : 
     336              : /*
     337              :  * Allocate a new buffer which can later be written with smgr_bulk_write().
     338              :  *
     339              :  * There is no function to free the buffer.  When you pass it to
     340              :  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
     341              :  * needed.
     342              :  *
     343              :  * This is currently implemented as a simple palloc, but could be implemented
     344              :  * using a ring buffer or larger chunks in the future, so don't rely on it.
     345              :  */
     346              : BulkWriteBuffer
     347        60295 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
     348              : {
     349        60295 :     return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
     350              : }
        

Generated by: LCOV version 2.0-1