LCOV - code coverage report
Current view: top level - src/backend/storage/smgr - bulk_write.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 67 70 95.7 %
Date: 2024-11-21 11:14:46 Functions: 7 7 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * bulk_write.c
       4             :  *    Efficiently and reliably populate a new relation
       5             :  *
       6             :  * The assumption is that no other backends access the relation while we are
       7             :  * loading it, so we can take some shortcuts.  Do not mix operations through
       8             :  * the regular buffer manager and the bulk loading interface!
       9             :  *
      10             :  * We bypass the buffer manager to avoid the locking overhead, and call
      11             :  * smgrextend() directly.  A downside is that the pages will need to be
      12             :  * re-read into shared buffers on first use after the build finishes.  That's
      13             :  * usually a good tradeoff for large relations, and for small relations, the
      14             :  * overhead isn't very significant compared to creating the relation in the
      15             :  * first place.
      16             :  *
      17             :  * The pages are WAL-logged if needed.  To save on WAL header overhead, we
      18             :  * WAL-log several pages in one record.
      19             :  *
      20             :  * One tricky point is that because we bypass the buffer manager, we need to
      21             :  * register the relation for fsyncing at the next checkpoint ourselves, and
      22             :  * make sure that the relation is correctly fsync'd by us or the checkpointer
      23             :  * even if a checkpoint happens concurrently.
      24             :  *
      25             :  *
      26             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      27             :  * Portions Copyright (c) 1994, Regents of the University of California
      28             :  *
      29             :  *
      30             :  * IDENTIFICATION
      31             :  *    src/backend/storage/smgr/bulk_write.c
      32             :  *
      33             :  *-------------------------------------------------------------------------
      34             :  */
      35             : #include "postgres.h"
      36             : 
      37             : #include "access/xloginsert.h"
      38             : #include "access/xlogrecord.h"
      39             : #include "storage/bufpage.h"
      40             : #include "storage/bulk_write.h"
      41             : #include "storage/proc.h"
      42             : #include "storage/smgr.h"
      43             : #include "utils/rel.h"
      44             : 
      45             : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
      46             : 
      47             : static const PGIOAlignedBlock zero_buffer = {{0}};  /* worth BLCKSZ */
      48             : 
      49             : typedef struct PendingWrite
      50             : {
      51             :     BulkWriteBuffer buf;
      52             :     BlockNumber blkno;
      53             :     bool        page_std;
      54             : } PendingWrite;
      55             : 
      56             : /*
      57             :  * Bulk writer state for one relation fork.
      58             :  */
      59             : struct BulkWriteState
      60             : {
      61             :     /* Information about the target relation we're writing */
      62             :     SMgrRelation smgr;
      63             :     ForkNumber  forknum;
      64             :     bool        use_wal;
      65             : 
      66             :     /* We keep several writes queued, and WAL-log them in batches */
      67             :     int         npending;
      68             :     PendingWrite pending_writes[MAX_PENDING_WRITES];
      69             : 
      70             :     /* Current size of the relation */
      71             :     BlockNumber pages_written;
      72             : 
      73             :     /* The RedoRecPtr at the time that the bulk operation started */
      74             :     XLogRecPtr  start_RedoRecPtr;
      75             : 
      76             :     MemoryContext memcxt;
      77             : };
      78             : 
      79             : static void smgr_bulk_flush(BulkWriteState *bulkstate);
      80             : 
      81             : /*
      82             :  * Start a bulk write operation on a relation fork.
      83             :  */
      84             : BulkWriteState *
      85       48298 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
      86             : {
      87       48298 :     return smgr_bulk_start_smgr(RelationGetSmgr(rel),
      88             :                                 forknum,
      89       48298 :                                 RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
      90             : }
      91             : 
      92             : /*
      93             :  * Start a bulk write operation on a relation fork.
      94             :  *
      95             :  * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
      96             :  */
      97             : BulkWriteState *
      98       48496 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
      99             : {
     100             :     BulkWriteState *state;
     101             : 
     102       48496 :     state = palloc(sizeof(BulkWriteState));
     103       48496 :     state->smgr = smgr;
     104       48496 :     state->forknum = forknum;
     105       48496 :     state->use_wal = use_wal;
     106             : 
     107       48496 :     state->npending = 0;
     108       48496 :     state->pages_written = 0;
     109             : 
     110       48496 :     state->start_RedoRecPtr = GetRedoRecPtr();
     111             : 
     112             :     /*
     113             :      * Remember the memory context.  We will use it to allocate all the
     114             :      * buffers later.
     115             :      */
     116       48496 :     state->memcxt = CurrentMemoryContext;
     117             : 
     118       48496 :     return state;
     119             : }
     120             : 
     121             : /*
     122             :  * Finish bulk write operation.
     123             :  *
     124             :  * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
     125             :  * the relation if needed.
     126             :  */
     127             : void
     128       48496 : smgr_bulk_finish(BulkWriteState *bulkstate)
     129             : {
     130             :     /* WAL-log and flush any remaining pages */
     131       48496 :     smgr_bulk_flush(bulkstate);
     132             : 
     133             :     /*
     134             :      * Fsync the relation, or register it for the next checkpoint, if
     135             :      * necessary.
     136             :      */
     137       48496 :     if (SmgrIsTemp(bulkstate->smgr))
     138             :     {
     139             :         /* Temporary relations don't need to be fsync'd, ever */
     140             :     }
     141       46186 :     else if (!bulkstate->use_wal)
     142             :     {
     143             :         /*----------
     144             :          * This is either an unlogged relation, or a permanent relation but we
     145             :          * skipped WAL-logging because wal_level=minimal:
     146             :          *
     147             :          * A) Unlogged relation
     148             :          *
     149             :          *    Unlogged relations will go away on crash, but they need to be
     150             :          *    fsync'd on a clean shutdown. It's sufficient to call
     151             :          *    smgrregistersync(), that ensures that the checkpointer will
     152             :          *    flush it at the shutdown checkpoint. (It will flush it on the
     153             :          *    next online checkpoint too, which is not strictly necessary.)
     154             :          *
     155             :          *    Note that the init-fork of an unlogged relation is not
     156             :          *    considered unlogged for our purposes. It's treated like a
     157             :          *    regular permanent relation. The callers will pass use_wal=true
     158             :          *    for the init fork.
     159             :          *
     160             :          * B) Permanent relation, WAL-logging skipped because wal_level=minimal
     161             :          *
     162             :          *    This is a new relation, and we didn't WAL-log the pages as we
     163             :          *    wrote, but they need to be fsync'd before commit.
     164             :          *
     165             :          *    We don't need to do that here, however. The fsync() is done at
     166             :          *    commit, by smgrDoPendingSyncs() (*).
     167             :          *
     168             :          *    (*) smgrDoPendingSyncs() might decide to WAL-log the whole
     169             :          *    relation at commit instead of fsyncing it, if the relation was
     170             :          *    very small, but it's smgrDoPendingSyncs() responsibility in any
     171             :          *    case.
     172             :          *
     173             :          * We cannot distinguish the two here, so conservatively assume it's
     174             :          * an unlogged relation. A permanent relation with wal_level=minimal
     175             :          * would require no actions, see above.
     176             :          */
     177       11052 :         smgrregistersync(bulkstate->smgr, bulkstate->forknum);
     178             :     }
     179             :     else
     180             :     {
     181             :         /*
     182             :          * Permanent relation, WAL-logged normally.
     183             :          *
     184             :          * We already WAL-logged all the pages, so they will be replayed from
     185             :          * WAL on crash. However, when we wrote out the pages, we passed
     186             :          * skipFsync=true to avoid the overhead of registering all the writes
     187             :          * with the checkpointer.  Register the whole relation now.
     188             :          *
     189             :          * There is one hole in that idea: If a checkpoint occurred while we
     190             :          * were writing the pages, it already missed fsyncing the pages we had
     191             :          * written before the checkpoint started.  A crash later on would
     192             :          * replay the WAL starting from the checkpoint, therefore it wouldn't
     193             :          * replay our earlier WAL records.  So if a checkpoint started after
     194             :          * the bulk write, fsync the files now.
     195             :          */
     196             : 
     197             :         /*
     198             :          * Prevent a checkpoint from starting between the GetRedoRecPtr() and
     199             :          * smgrregistersync() calls.
     200             :          */
     201             :         Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
     202       35134 :         MyProc->delayChkptFlags |= DELAY_CHKPT_START;
     203             : 
     204       35134 :         if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
     205             :         {
     206             :             /*
     207             :              * A checkpoint occurred and it didn't know about our writes, so
     208             :              * fsync() the relation ourselves.
     209             :              */
     210           0 :             MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
     211           0 :             smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
     212           0 :             elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
     213             :         }
     214             :         else
     215             :         {
     216       35134 :             smgrregistersync(bulkstate->smgr, bulkstate->forknum);
     217       35134 :             MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
     218             :         }
     219             :     }
     220       48496 : }
     221             : 
     222             : static int
     223      220306 : buffer_cmp(const void *a, const void *b)
     224             : {
     225      220306 :     const PendingWrite *bufa = (const PendingWrite *) a;
     226      220306 :     const PendingWrite *bufb = (const PendingWrite *) b;
     227             : 
     228             :     /* We should not see duplicated writes for the same block */
     229             :     Assert(bufa->blkno != bufb->blkno);
     230      220306 :     if (bufa->blkno > bufb->blkno)
     231      103176 :         return 1;
     232             :     else
     233      117130 :         return -1;
     234             : }
     235             : 
     236             : /*
     237             :  * Finish all the pending writes.
     238             :  */
     239             : static void
     240       49620 : smgr_bulk_flush(BulkWriteState *bulkstate)
     241             : {
     242       49620 :     int         npending = bulkstate->npending;
     243       49620 :     PendingWrite *pending_writes = bulkstate->pending_writes;
     244             : 
     245       49620 :     if (npending == 0)
     246         212 :         return;
     247             : 
     248       49408 :     if (npending > 1)
     249       10706 :         qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
     250             : 
     251       49408 :     if (bulkstate->use_wal)
     252             :     {
     253             :         BlockNumber blknos[MAX_PENDING_WRITES];
     254             :         Page        pages[MAX_PENDING_WRITES];
     255       35612 :         bool        page_std = true;
     256             : 
     257      106534 :         for (int i = 0; i < npending; i++)
     258             :         {
     259       70922 :             blknos[i] = pending_writes[i].blkno;
     260       70922 :             pages[i] = pending_writes[i].buf->data;
     261             : 
     262             :             /*
     263             :              * If any of the pages use !page_std, we log them all as such.
     264             :              * That's a bit wasteful, but in practice, a mix of standard and
     265             :              * non-standard page layout is rare.  None of the built-in AMs do
     266             :              * that.
     267             :              */
     268       70922 :             if (!pending_writes[i].page_std)
     269         108 :                 page_std = false;
     270             :         }
     271       35612 :         log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
     272             :                      npending, blknos, pages, page_std);
     273             :     }
     274             : 
     275      160992 :     for (int i = 0; i < npending; i++)
     276             :     {
     277      111584 :         BlockNumber blkno = pending_writes[i].blkno;
     278      111584 :         Page        page = pending_writes[i].buf->data;
     279             : 
     280      111584 :         PageSetChecksumInplace(page, blkno);
     281             : 
     282      111584 :         if (blkno >= bulkstate->pages_written)
     283             :         {
     284             :             /*
     285             :              * If we have to write pages nonsequentially, fill in the space
     286             :              * with zeroes until we come back and overwrite.  This is not
     287             :              * logically necessary on standard Unix filesystems (unwritten
     288             :              * space will read as zeroes anyway), but it should help to avoid
     289             :              * fragmentation.  The dummy pages aren't WAL-logged though.
     290             :              */
     291      111584 :             while (blkno > bulkstate->pages_written)
     292             :             {
     293             :                 /* don't set checksum for all-zero page */
     294         526 :                 smgrextend(bulkstate->smgr, bulkstate->forknum,
     295         526 :                            bulkstate->pages_written++,
     296             :                            &zero_buffer,
     297             :                            true);
     298             :             }
     299             : 
     300      111058 :             smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
     301      111058 :             bulkstate->pages_written = pending_writes[i].blkno + 1;
     302             :         }
     303             :         else
     304         526 :             smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
     305      111584 :         pfree(page);
     306             :     }
     307             : 
     308       49408 :     bulkstate->npending = 0;
     309             : }
     310             : 
     311             : /*
     312             :  * Queue write of 'buf'.
     313             :  *
     314             :  * NB: this takes ownership of 'buf'!
     315             :  *
     316             :  * You are only allowed to write a given block once as part of one bulk write
     317             :  * operation.
     318             :  */
     319             : void
     320      111584 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
     321             : {
     322             :     PendingWrite *w;
     323             : 
     324      111584 :     w = &bulkstate->pending_writes[bulkstate->npending++];
     325      111584 :     w->buf = buf;
     326      111584 :     w->blkno = blocknum;
     327      111584 :     w->page_std = page_std;
     328             : 
     329      111584 :     if (bulkstate->npending == MAX_PENDING_WRITES)
     330        1124 :         smgr_bulk_flush(bulkstate);
     331      111584 : }
     332             : 
     333             : /*
     334             :  * Allocate a new buffer which can later be written with smgr_bulk_write().
     335             :  *
     336             :  * There is no function to free the buffer.  When you pass it to
     337             :  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
     338             :  * needed.
     339             :  *
     340             :  * This is currently implemented as a simple palloc, but could be implemented
     341             :  * using a ring buffer or larger chunks in the future, so don't rely on it.
     342             :  */
     343             : BulkWriteBuffer
     344      111584 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
     345             : {
     346      111584 :     return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
     347             : }

Generated by: LCOV version 1.14