LCOV - code coverage report
Current view: top level - src/backend/storage/smgr - bulk_write.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 70 70 100.0 %
Date: 2025-01-18 04:15:08 Functions: 7 7 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * bulk_write.c
       4             :  *    Efficiently and reliably populate a new relation
       5             :  *
       6             :  * The assumption is that no other backends access the relation while we are
       7             :  * loading it, so we can take some shortcuts.  Pages already present in the
       8             :  * indicated fork when the bulk write operation is started are not modified
       9             :  * unless explicitly written to.  Do not mix operations through the regular
      10             :  * buffer manager and the bulk loading interface!
      11             :  *
      12             :  * We bypass the buffer manager to avoid the locking overhead, and call
      13             :  * smgrextend() directly.  A downside is that the pages will need to be
      14             :  * re-read into shared buffers on first use after the build finishes.  That's
      15             :  * usually a good tradeoff for large relations, and for small relations, the
      16             :  * overhead isn't very significant compared to creating the relation in the
      17             :  * first place.
      18             :  *
      19             :  * The pages are WAL-logged if needed.  To save on WAL header overhead, we
      20             :  * WAL-log several pages in one record.
      21             :  *
      22             :  * One tricky point is that because we bypass the buffer manager, we need to
      23             :  * register the relation for fsyncing at the next checkpoint ourselves, and
      24             :  * make sure that the relation is correctly fsync'd by us or the checkpointer
      25             :  * even if a checkpoint happens concurrently.
      26             :  *
      27             :  *
      28             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      29             :  * Portions Copyright (c) 1994, Regents of the University of California
      30             :  *
      31             :  *
      32             :  * IDENTIFICATION
      33             :  *    src/backend/storage/smgr/bulk_write.c
      34             :  *
      35             :  *-------------------------------------------------------------------------
      36             :  */
      37             : #include "postgres.h"
      38             : 
      39             : #include "access/xloginsert.h"
      40             : #include "access/xlogrecord.h"
      41             : #include "storage/bufpage.h"
      42             : #include "storage/bulk_write.h"
      43             : #include "storage/proc.h"
      44             : #include "storage/smgr.h"
      45             : #include "utils/rel.h"
      46             : 
      47             : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
      48             : 
      49             : static const PGIOAlignedBlock zero_buffer = {{0}};  /* worth BLCKSZ */
      50             : 
      51             : typedef struct PendingWrite
      52             : {
      53             :     BulkWriteBuffer buf;
      54             :     BlockNumber blkno;
      55             :     bool        page_std;
      56             : } PendingWrite;
      57             : 
      58             : /*
      59             :  * Bulk writer state for one relation fork.
      60             :  */
      61             : struct BulkWriteState
      62             : {
      63             :     /* Information about the target relation we're writing */
      64             :     SMgrRelation smgr;
      65             :     ForkNumber  forknum;
      66             :     bool        use_wal;
      67             : 
      68             :     /* We keep several writes queued, and WAL-log them in batches */
      69             :     int         npending;
      70             :     PendingWrite pending_writes[MAX_PENDING_WRITES];
      71             : 
      72             :     /* Current size of the relation */
      73             :     BlockNumber relsize;
      74             : 
      75             :     /* The RedoRecPtr at the time that the bulk operation started */
      76             :     XLogRecPtr  start_RedoRecPtr;
      77             : 
      78             :     MemoryContext memcxt;
      79             : };
      80             : 
      81             : static void smgr_bulk_flush(BulkWriteState *bulkstate);
      82             : 
      83             : /*
      84             :  * Start a bulk write operation on a relation fork.
      85             :  */
      86             : BulkWriteState *
      87       48524 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
      88             : {
      89       48524 :     return smgr_bulk_start_smgr(RelationGetSmgr(rel),
      90             :                                 forknum,
      91       48524 :                                 RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
      92             : }
      93             : 
      94             : /*
      95             :  * Start a bulk write operation on a relation fork.
      96             :  *
      97             :  * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
      98             :  */
      99             : BulkWriteState *
     100       48702 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
     101             : {
     102             :     BulkWriteState *state;
     103             : 
     104       48702 :     state = palloc(sizeof(BulkWriteState));
     105       48702 :     state->smgr = smgr;
     106       48702 :     state->forknum = forknum;
     107       48702 :     state->use_wal = use_wal;
     108             : 
     109       48702 :     state->npending = 0;
     110       48702 :     state->relsize = smgrnblocks(smgr, forknum);
     111             : 
     112       48702 :     state->start_RedoRecPtr = GetRedoRecPtr();
     113             : 
     114             :     /*
     115             :      * Remember the memory context.  We will use it to allocate all the
     116             :      * buffers later.
     117             :      */
     118       48702 :     state->memcxt = CurrentMemoryContext;
     119             : 
     120       48702 :     return state;
     121             : }
     122             : 
     123             : /*
     124             :  * Finish bulk write operation.
     125             :  *
     126             :  * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
     127             :  * the relation if needed.
     128             :  */
     129             : void
     130       48702 : smgr_bulk_finish(BulkWriteState *bulkstate)
     131             : {
     132             :     /* WAL-log and flush any remaining pages */
     133       48702 :     smgr_bulk_flush(bulkstate);
     134             : 
     135             :     /*
     136             :      * Fsync the relation, or register it for the next checkpoint, if
     137             :      * necessary.
     138             :      */
     139       48702 :     if (SmgrIsTemp(bulkstate->smgr))
     140             :     {
     141             :         /* Temporary relations don't need to be fsync'd, ever */
     142             :     }
     143       46356 :     else if (!bulkstate->use_wal)
     144             :     {
     145             :         /*----------
     146             :          * This is either an unlogged relation, or a permanent relation but we
     147             :          * skipped WAL-logging because wal_level=minimal:
     148             :          *
     149             :          * A) Unlogged relation
     150             :          *
     151             :          *    Unlogged relations will go away on crash, but they need to be
     152             :          *    fsync'd on a clean shutdown. It's sufficient to call
     153             :          *    smgrregistersync(), that ensures that the checkpointer will
     154             :          *    flush it at the shutdown checkpoint. (It will flush it on the
     155             :          *    next online checkpoint too, which is not strictly necessary.)
     156             :          *
     157             :          *    Note that the init-fork of an unlogged relation is not
     158             :          *    considered unlogged for our purposes. It's treated like a
     159             :          *    regular permanent relation. The callers will pass use_wal=true
     160             :          *    for the init fork.
     161             :          *
     162             :          * B) Permanent relation, WAL-logging skipped because wal_level=minimal
     163             :          *
     164             :          *    This is a new relation, and we didn't WAL-log the pages as we
     165             :          *    wrote, but they need to be fsync'd before commit.
     166             :          *
     167             :          *    We don't need to do that here, however. The fsync() is done at
     168             :          *    commit, by smgrDoPendingSyncs() (*).
     169             :          *
     170             :          *    (*) smgrDoPendingSyncs() might decide to WAL-log the whole
     171             :          *    relation at commit instead of fsyncing it, if the relation was
     172             :          *    very small, but it's smgrDoPendingSyncs() responsibility in any
     173             :          *    case.
     174             :          *
     175             :          * We cannot distinguish the two here, so conservatively assume it's
     176             :          * an unlogged relation. A permanent relation with wal_level=minimal
     177             :          * would require no actions, see above.
     178             :          */
     179       11096 :         smgrregistersync(bulkstate->smgr, bulkstate->forknum);
     180             :     }
     181             :     else
     182             :     {
     183             :         /*
     184             :          * Permanent relation, WAL-logged normally.
     185             :          *
     186             :          * We already WAL-logged all the pages, so they will be replayed from
     187             :          * WAL on crash. However, when we wrote out the pages, we passed
     188             :          * skipFsync=true to avoid the overhead of registering all the writes
     189             :          * with the checkpointer.  Register the whole relation now.
     190             :          *
     191             :          * There is one hole in that idea: If a checkpoint occurred while we
     192             :          * were writing the pages, it already missed fsyncing the pages we had
     193             :          * written before the checkpoint started.  A crash later on would
     194             :          * replay the WAL starting from the checkpoint, therefore it wouldn't
     195             :          * replay our earlier WAL records.  So if a checkpoint started after
     196             :          * the bulk write, fsync the files now.
     197             :          */
     198             : 
     199             :         /*
     200             :          * Prevent a checkpoint from starting between the GetRedoRecPtr() and
     201             :          * smgrregistersync() calls.
     202             :          */
     203             :         Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
     204       35260 :         MyProc->delayChkptFlags |= DELAY_CHKPT_START;
     205             : 
     206       35260 :         if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
     207             :         {
     208             :             /*
     209             :              * A checkpoint occurred and it didn't know about our writes, so
     210             :              * fsync() the relation ourselves.
     211             :              */
     212           2 :             MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
     213           2 :             smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
     214           2 :             elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
     215             :         }
     216             :         else
     217             :         {
     218       35258 :             smgrregistersync(bulkstate->smgr, bulkstate->forknum);
     219       35258 :             MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
     220             :         }
     221             :     }
     222       48702 : }
     223             : 
     224             : static int
     225      222948 : buffer_cmp(const void *a, const void *b)
     226             : {
     227      222948 :     const PendingWrite *bufa = (const PendingWrite *) a;
     228      222948 :     const PendingWrite *bufb = (const PendingWrite *) b;
     229             : 
     230             :     /* We should not see duplicated writes for the same block */
     231             :     Assert(bufa->blkno != bufb->blkno);
     232      222948 :     if (bufa->blkno > bufb->blkno)
     233      104400 :         return 1;
     234             :     else
     235      118548 :         return -1;
     236             : }
     237             : 
     238             : /*
     239             :  * Finish all the pending writes.
     240             :  */
     241             : static void
     242       49838 : smgr_bulk_flush(BulkWriteState *bulkstate)
     243             : {
     244       49838 :     int         npending = bulkstate->npending;
     245       49838 :     PendingWrite *pending_writes = bulkstate->pending_writes;
     246             : 
     247       49838 :     if (npending == 0)
     248         208 :         return;
     249             : 
     250       49630 :     if (npending > 1)
     251       10750 :         qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
     252             : 
     253       49630 :     if (bulkstate->use_wal)
     254             :     {
     255             :         BlockNumber blknos[MAX_PENDING_WRITES];
     256             :         Page        pages[MAX_PENDING_WRITES];
     257       35738 :         bool        page_std = true;
     258             : 
     259      106934 :         for (int i = 0; i < npending; i++)
     260             :         {
     261       71196 :             blknos[i] = pending_writes[i].blkno;
     262       71196 :             pages[i] = pending_writes[i].buf->data;
     263             : 
     264             :             /*
     265             :              * If any of the pages use !page_std, we log them all as such.
     266             :              * That's a bit wasteful, but in practice, a mix of standard and
     267             :              * non-standard page layout is rare.  None of the built-in AMs do
     268             :              * that.
     269             :              */
     270       71196 :             if (!pending_writes[i].page_std)
     271         104 :                 page_std = false;
     272             :         }
     273       35738 :         log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
     274             :                      npending, blknos, pages, page_std);
     275             :     }
     276             : 
     277      162092 :     for (int i = 0; i < npending; i++)
     278             :     {
     279      112462 :         BlockNumber blkno = pending_writes[i].blkno;
     280      112462 :         Page        page = pending_writes[i].buf->data;
     281             : 
     282      112462 :         PageSetChecksumInplace(page, blkno);
     283             : 
     284      112462 :         if (blkno >= bulkstate->relsize)
     285             :         {
     286             :             /*
     287             :              * If we have to write pages nonsequentially, fill in the space
     288             :              * with zeroes until we come back and overwrite.  This is not
     289             :              * logically necessary on standard Unix filesystems (unwritten
     290             :              * space will read as zeroes anyway), but it should help to avoid
     291             :              * fragmentation.  The dummy pages aren't WAL-logged though.
     292             :              */
     293      112462 :             while (blkno > bulkstate->relsize)
     294             :             {
     295             :                 /* don't set checksum for all-zero page */
     296         538 :                 smgrextend(bulkstate->smgr, bulkstate->forknum,
     297             :                            bulkstate->relsize,
     298             :                            &zero_buffer,
     299             :                            true);
     300         538 :                 bulkstate->relsize++;
     301             :             }
     302             : 
     303      111924 :             smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
     304      111924 :             bulkstate->relsize++;
     305             :         }
     306             :         else
     307         538 :             smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
     308      112462 :         pfree(page);
     309             :     }
     310             : 
     311       49630 :     bulkstate->npending = 0;
     312             : }
     313             : 
     314             : /*
     315             :  * Queue write of 'buf'.
     316             :  *
     317             :  * NB: this takes ownership of 'buf'!
     318             :  *
     319             :  * You are only allowed to write a given block once as part of one bulk write
     320             :  * operation.
     321             :  */
     322             : void
     323      112462 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
     324             : {
     325             :     PendingWrite *w;
     326             : 
     327      112462 :     w = &bulkstate->pending_writes[bulkstate->npending++];
     328      112462 :     w->buf = buf;
     329      112462 :     w->blkno = blocknum;
     330      112462 :     w->page_std = page_std;
     331             : 
     332      112462 :     if (bulkstate->npending == MAX_PENDING_WRITES)
     333        1136 :         smgr_bulk_flush(bulkstate);
     334      112462 : }
     335             : 
     336             : /*
     337             :  * Allocate a new buffer which can later be written with smgr_bulk_write().
     338             :  *
     339             :  * There is no function to free the buffer.  When you pass it to
     340             :  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
     341             :  * needed.
     342             :  *
     343             :  * This is currently implemented as a simple palloc, but could be implemented
     344             :  * using a ring buffer or larger chunks in the future, so don't rely on it.
     345             :  */
     346             : BulkWriteBuffer
     347      112462 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
     348             : {
     349      112462 :     return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
     350             : }

Generated by: LCOV version 1.14