LCOV - code coverage report
Current view: top level - src/backend/storage/smgr - bulk_write.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 65 68 95.6 %
Date: 2024-04-29 02:11:02 Functions: 7 7 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * bulk_write.c
       4             :  *    Efficiently and reliably populate a new relation
       5             :  *
       6             :  * The assumption is that no other backends access the relation while we are
       7             :  * loading it, so we can take some shortcuts.  Do not mix operations through
       8             :  * the regular buffer manager and the bulk loading interface!
       9             :  *
      10             :  * We bypass the buffer manager to avoid the locking overhead, and call
      11             :  * smgrextend() directly.  A downside is that the pages will need to be
      12             :  * re-read into shared buffers on first use after the build finishes.  That's
      13             :  * usually a good tradeoff for large relations, and for small relations, the
      14             :  * overhead isn't very significant compared to creating the relation in the
      15             :  * first place.
      16             :  *
      17             :  * The pages are WAL-logged if needed.  To save on WAL header overhead, we
      18             :  * WAL-log several pages in one record.
      19             :  *
      20             :  * One tricky point is that because we bypass the buffer manager, we need to
      21             :  * register the relation for fsyncing at the next checkpoint ourselves, and
      22             :  * make sure that the relation is correctly fsync'd by us or the checkpointer
      23             :  * even if a checkpoint happens concurrently.
      24             :  *
      25             :  *
      26             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      27             :  * Portions Copyright (c) 1994, Regents of the University of California
      28             :  *
      29             :  *
      30             :  * IDENTIFICATION
      31             :  *    src/backend/storage/smgr/bulk_write.c
      32             :  *
      33             :  *-------------------------------------------------------------------------
      34             :  */
      35             : #include "postgres.h"
      36             : 
      37             : #include "access/xloginsert.h"
      38             : #include "access/xlogrecord.h"
      39             : #include "storage/bufmgr.h"
      40             : #include "storage/bufpage.h"
      41             : #include "storage/bulk_write.h"
      42             : #include "storage/proc.h"
      43             : #include "storage/smgr.h"
      44             : #include "utils/rel.h"
      45             : 
      46             : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
      47             : 
      48             : static const PGIOAlignedBlock zero_buffer = {{0}};  /* worth BLCKSZ */
      49             : 
      50             : typedef struct PendingWrite
      51             : {
      52             :     BulkWriteBuffer buf;
      53             :     BlockNumber blkno;
      54             :     bool        page_std;
      55             : } PendingWrite;
      56             : 
      57             : /*
      58             :  * Bulk writer state for one relation fork.
      59             :  */
      60             : struct BulkWriteState
      61             : {
      62             :     /* Information about the target relation we're writing */
      63             :     SMgrRelation smgr;
      64             :     ForkNumber  forknum;
      65             :     bool        use_wal;
      66             : 
      67             :     /* We keep several writes queued, and WAL-log them in batches */
      68             :     int         npending;
      69             :     PendingWrite pending_writes[MAX_PENDING_WRITES];
      70             : 
      71             :     /* Current size of the relation */
      72             :     BlockNumber pages_written;
      73             : 
      74             :     /* The RedoRecPtr at the time that the bulk operation started */
      75             :     XLogRecPtr  start_RedoRecPtr;
      76             : 
      77             :     MemoryContext memcxt;
      78             : };
      79             : 
      80             : static void smgr_bulk_flush(BulkWriteState *bulkstate);
      81             : 
      82             : /*
      83             :  * Start a bulk write operation on a relation fork.
      84             :  */
      85             : BulkWriteState *
      86       46456 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
      87             : {
      88       46456 :     return smgr_bulk_start_smgr(RelationGetSmgr(rel),
      89             :                                 forknum,
      90       46456 :                                 RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
      91             : }
      92             : 
      93             : /*
      94             :  * Start a bulk write operation on a relation fork.
      95             :  *
      96             :  * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
      97             :  */
      98             : BulkWriteState *
      99       46634 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
     100             : {
     101             :     BulkWriteState *state;
     102             : 
     103       46634 :     state = palloc(sizeof(BulkWriteState));
     104       46634 :     state->smgr = smgr;
     105       46634 :     state->forknum = forknum;
     106       46634 :     state->use_wal = use_wal;
     107             : 
     108       46634 :     state->npending = 0;
     109       46634 :     state->pages_written = 0;
     110             : 
     111       46634 :     state->start_RedoRecPtr = GetRedoRecPtr();
     112             : 
     113             :     /*
     114             :      * Remember the memory context.  We will use it to allocate all the
     115             :      * buffers later.
     116             :      */
     117       46634 :     state->memcxt = CurrentMemoryContext;
     118             : 
     119       46634 :     return state;
     120             : }
     121             : 
     122             : /*
     123             :  * Finish bulk write operation.
     124             :  *
     125             :  * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
     126             :  * the relation if needed.
     127             :  */
     128             : void
     129       46634 : smgr_bulk_finish(BulkWriteState *bulkstate)
     130             : {
     131             :     /* WAL-log and flush any remaining pages */
     132       46634 :     smgr_bulk_flush(bulkstate);
     133             : 
     134             :     /*
     135             :      * When we wrote out the pages, we passed skipFsync=true to avoid the
     136             :      * overhead of registering all the writes with the checkpointer.  Register
     137             :      * the whole relation now.
     138             :      *
     139             :      * There is one hole in that idea: If a checkpoint occurred while we were
     140             :      * writing the pages, it already missed fsyncing the pages we had written
     141             :      * before the checkpoint started.  A crash later on would replay the WAL
     142             :      * starting from the checkpoint, therefore it wouldn't replay our earlier
     143             :      * WAL records.  So if a checkpoint started after the bulk write, fsync
     144             :      * the files now.
     145             :      */
     146       46634 :     if (!SmgrIsTemp(bulkstate->smgr))
     147             :     {
     148             :         /*
     149             :          * Prevent a checkpoint from starting between the GetRedoRecPtr() and
     150             :          * smgrregistersync() calls.
     151             :          */
     152             :         Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
     153       44414 :         MyProc->delayChkptFlags |= DELAY_CHKPT_START;
     154             : 
     155       44414 :         if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
     156             :         {
     157             :             /*
     158             :              * A checkpoint occurred and it didn't know about our writes, so
     159             :              * fsync() the relation ourselves.
     160             :              */
     161           0 :             MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
     162           0 :             smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
     163           0 :             elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
     164             :         }
     165             :         else
     166             :         {
     167       44414 :             smgrregistersync(bulkstate->smgr, bulkstate->forknum);
     168       44414 :             MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
     169             :         }
     170             :     }
     171       46634 : }
     172             : 
     173             : static int
     174      197884 : buffer_cmp(const void *a, const void *b)
     175             : {
     176      197884 :     const PendingWrite *bufa = (const PendingWrite *) a;
     177      197884 :     const PendingWrite *bufb = (const PendingWrite *) b;
     178             : 
     179             :     /* We should not see duplicated writes for the same block */
     180             :     Assert(bufa->blkno != bufb->blkno);
     181      197884 :     if (bufa->blkno > bufb->blkno)
     182       92198 :         return 1;
     183             :     else
     184      105686 :         return -1;
     185             : }
     186             : 
     187             : /*
     188             :  * Finish all the pending writes.
     189             :  */
     190             : static void
     191       47748 : smgr_bulk_flush(BulkWriteState *bulkstate)
     192             : {
     193       47748 :     int         npending = bulkstate->npending;
     194       47748 :     PendingWrite *pending_writes = bulkstate->pending_writes;
     195             : 
     196       47748 :     if (npending == 0)
     197         304 :         return;
     198             : 
     199       47444 :     if (npending > 1)
     200       10166 :         qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
     201             : 
     202       47444 :     if (bulkstate->use_wal)
     203             :     {
     204             :         BlockNumber blknos[MAX_PENDING_WRITES];
     205             :         Page        pages[MAX_PENDING_WRITES];
     206       33846 :         bool        page_std = true;
     207             : 
     208      101268 :         for (int i = 0; i < npending; i++)
     209             :         {
     210       67422 :             blknos[i] = pending_writes[i].blkno;
     211       67422 :             pages[i] = pending_writes[i].buf->data;
     212             : 
     213             :             /*
     214             :              * If any of the pages use !page_std, we log them all as such.
     215             :              * That's a bit wasteful, but in practice, a mix of standard and
     216             :              * non-standard page layout is rare.  None of the built-in AMs do
     217             :              * that.
     218             :              */
     219       67422 :             if (!pending_writes[i].page_std)
     220         102 :                 page_std = false;
     221             :         }
     222       33846 :         log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
     223             :                      npending, blknos, pages, page_std);
     224             :     }
     225             : 
     226      155140 :     for (int i = 0; i < npending; i++)
     227             :     {
     228      107696 :         BlockNumber blkno = pending_writes[i].blkno;
     229      107696 :         Page        page = pending_writes[i].buf->data;
     230             : 
     231      107696 :         PageSetChecksumInplace(page, blkno);
     232             : 
     233      107696 :         if (blkno >= bulkstate->pages_written)
     234             :         {
     235             :             /*
     236             :              * If we have to write pages nonsequentially, fill in the space
     237             :              * with zeroes until we come back and overwrite.  This is not
     238             :              * logically necessary on standard Unix filesystems (unwritten
     239             :              * space will read as zeroes anyway), but it should help to avoid
     240             :              * fragmentation.  The dummy pages aren't WAL-logged though.
     241             :              */
     242      107696 :             while (blkno > bulkstate->pages_written)
     243             :             {
     244             :                 /* don't set checksum for all-zero page */
     245         412 :                 smgrextend(bulkstate->smgr, bulkstate->forknum,
     246         412 :                            bulkstate->pages_written++,
     247             :                            &zero_buffer,
     248             :                            true);
     249             :             }
     250             : 
     251      107284 :             smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
     252      107284 :             bulkstate->pages_written = pending_writes[i].blkno + 1;
     253             :         }
     254             :         else
     255         412 :             smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
     256      107696 :         pfree(page);
     257             :     }
     258             : 
     259       47444 :     bulkstate->npending = 0;
     260             : }
     261             : 
     262             : /*
     263             :  * Queue write of 'buf'.
     264             :  *
     265             :  * NB: this takes ownership of 'buf'!
     266             :  *
     267             :  * You are only allowed to write a given block once as part of one bulk write
     268             :  * operation.
     269             :  */
     270             : void
     271      107696 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
     272             : {
     273             :     PendingWrite *w;
     274             : 
     275      107696 :     w = &bulkstate->pending_writes[bulkstate->npending++];
     276      107696 :     w->buf = buf;
     277      107696 :     w->blkno = blocknum;
     278      107696 :     w->page_std = page_std;
     279             : 
     280      107696 :     if (bulkstate->npending == MAX_PENDING_WRITES)
     281        1114 :         smgr_bulk_flush(bulkstate);
     282      107696 : }
     283             : 
     284             : /*
     285             :  * Allocate a new buffer which can later be written with smgr_bulk_write().
     286             :  *
     287             :  * There is no function to free the buffer.  When you pass it to
     288             :  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
     289             :  * needed.
     290             :  *
     291             :  * This is currently implemented as a simple palloc, but could be implemented
     292             :  * using a ring buffer or larger chunks in the future, so don't rely on it.
     293             :  */
     294             : BulkWriteBuffer
     295      107696 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
     296             : {
     297      107696 :     return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
     298             : }

Generated by: LCOV version 1.14