LCOV - code coverage report
Current view: top level - src/backend/access/transam - xloginsert.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 339 402 84.3 %
Date: 2025-12-07 21:18:12 Functions: 18 19 94.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * xloginsert.c
       4             :  *      Functions for constructing WAL records
       5             :  *
       6             :  * Constructing a WAL record begins with a call to XLogBeginInsert,
       7             :  * followed by a number of XLogRegister* calls. The registered data is
       8             :  * collected in private working memory, and finally assembled into a chain
       9             :  * of XLogRecData structs by a call to XLogRecordAssemble(). See
      10             :  * access/transam/README for details.
      11             :  *
      12             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      13             :  * Portions Copyright (c) 1994, Regents of the University of California
      14             :  *
      15             :  * src/backend/access/transam/xloginsert.c
      16             :  *
      17             :  *-------------------------------------------------------------------------
      18             :  */
      19             : 
      20             : #include "postgres.h"
      21             : 
      22             : #ifdef USE_LZ4
      23             : #include <lz4.h>
      24             : #endif
      25             : 
      26             : #ifdef USE_ZSTD
      27             : #include <zstd.h>
      28             : #endif
      29             : 
      30             : #include "access/xact.h"
      31             : #include "access/xlog.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xloginsert.h"
      34             : #include "catalog/pg_control.h"
      35             : #include "common/pg_lzcompress.h"
      36             : #include "executor/instrument.h"
      37             : #include "miscadmin.h"
      38             : #include "pg_trace.h"
      39             : #include "replication/origin.h"
      40             : #include "storage/bufmgr.h"
      41             : #include "storage/proc.h"
      42             : #include "utils/memutils.h"
      43             : #include "utils/pgstat_internal.h"
      44             : 
      45             : /*
      46             :  * Guess the maximum buffer size required to store a compressed version of
      47             :  * backup block image.
      48             :  */
      49             : #ifdef USE_LZ4
      50             : #define LZ4_MAX_BLCKSZ      LZ4_COMPRESSBOUND(BLCKSZ)
      51             : #else
      52             : #define LZ4_MAX_BLCKSZ      0
      53             : #endif
      54             : 
      55             : #ifdef USE_ZSTD
      56             : #define ZSTD_MAX_BLCKSZ     ZSTD_COMPRESSBOUND(BLCKSZ)
      57             : #else
      58             : #define ZSTD_MAX_BLCKSZ     0
      59             : #endif
      60             : 
      61             : #define PGLZ_MAX_BLCKSZ     PGLZ_MAX_OUTPUT(BLCKSZ)
      62             : 
      63             : /* Buffer size required to store a compressed version of backup block image */
      64             : #define COMPRESS_BUFSIZE    Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
      65             : 
      66             : /*
      67             :  * For each block reference registered with XLogRegisterBuffer, we fill in
      68             :  * a registered_buffer struct.
      69             :  */
      70             : typedef struct
      71             : {
      72             :     bool        in_use;         /* is this slot in use? */
      73             :     uint8       flags;          /* REGBUF_* flags */
      74             :     RelFileLocator rlocator;    /* identifies the relation and block */
      75             :     ForkNumber  forkno;
      76             :     BlockNumber block;
      77             :     const PageData *page;       /* page content */
      78             :     uint32      rdata_len;      /* total length of data in rdata chain */
      79             :     XLogRecData *rdata_head;    /* head of the chain of data registered with
      80             :                                  * this block */
      81             :     XLogRecData *rdata_tail;    /* last entry in the chain, or &rdata_head if
      82             :                                  * empty */
      83             : 
      84             :     XLogRecData bkp_rdatas[2];  /* temporary rdatas used to hold references to
      85             :                                  * backup block data in XLogRecordAssemble() */
      86             : 
      87             :     /* buffer to store a compressed version of backup block image */
      88             :     char        compressed_page[COMPRESS_BUFSIZE];
      89             : } registered_buffer;
      90             : 
      91             : static registered_buffer *registered_buffers;
      92             : static int  max_registered_buffers; /* allocated size */
      93             : static int  max_registered_block_id = 0;    /* highest block_id + 1 currently
      94             :                                              * registered */
      95             : 
      96             : /*
      97             :  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
      98             :  * with XLogRegisterData(...).
      99             :  */
     100             : static XLogRecData *mainrdata_head;
     101             : static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
     102             : static uint64 mainrdata_len;    /* total # of bytes in chain */
     103             : 
     104             : /* flags for the in-progress insertion */
     105             : static uint8 curinsert_flags = 0;
     106             : 
     107             : /*
     108             :  * These are used to hold the record header while constructing a record.
     109             :  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
     110             :  * because we want it to be MAXALIGNed and padding bytes zeroed.
     111             :  *
     112             :  * For simplicity, it's allocated large enough to hold the headers for any
     113             :  * WAL record.
     114             :  */
     115             : static XLogRecData hdr_rdt;
     116             : static char *hdr_scratch = NULL;
     117             : 
     118             : #define SizeOfXlogOrigin    (sizeof(RepOriginId) + sizeof(char))
     119             : #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
     120             : 
     121             : #define HEADER_SCRATCH_SIZE \
     122             :     (SizeOfXLogRecord + \
     123             :      MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
     124             :      SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
     125             :      SizeOfXLogTransactionId)
     126             : 
     127             : /*
     128             :  * An array of XLogRecData structs, to hold registered data.
     129             :  */
     130             : static XLogRecData *rdatas;
     131             : static int  num_rdatas;         /* entries currently used */
     132             : static int  max_rdatas;         /* allocated size */
     133             : 
     134             : static bool begininsert_called = false;
     135             : 
     136             : /* Memory context to hold the registered buffer and data references. */
     137             : static MemoryContext xloginsert_cxt;
     138             : 
     139             : static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
     140             :                                        XLogRecPtr RedoRecPtr, bool doPageWrites,
     141             :                                        XLogRecPtr *fpw_lsn, int *num_fpi,
     142             :                                        uint64 *fpi_bytes,
     143             :                                        bool *topxid_included);
     144             : static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
     145             :                                     uint16 hole_length, void *dest, uint16 *dlen);
     146             : 
     147             : /*
     148             :  * Begin constructing a WAL record. This must be called before the
     149             :  * XLogRegister* functions and XLogInsert().
     150             :  */
     151             : void
     152    30492376 : XLogBeginInsert(void)
     153             : {
     154             :     Assert(max_registered_block_id == 0);
     155             :     Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
     156             :     Assert(mainrdata_len == 0);
     157             : 
     158             :     /* cross-check on whether we should be here or not */
     159    30492376 :     if (!XLogInsertAllowed())
     160           0 :         elog(ERROR, "cannot make new WAL entries during recovery");
     161             : 
     162    30492376 :     if (begininsert_called)
     163           0 :         elog(ERROR, "XLogBeginInsert was already called");
     164             : 
     165    30492376 :     begininsert_called = true;
     166    30492376 : }
     167             : 
     168             : /*
     169             :  * Ensure that there are enough buffer and data slots in the working area,
     170             :  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
     171             :  * calls.
     172             :  *
     173             :  * There is always space for a small number of buffers and data chunks, enough
     174             :  * for most record types. This function is for the exceptional cases that need
     175             :  * more.
     176             :  */
     177             : void
     178      135528 : XLogEnsureRecordSpace(int max_block_id, int ndatas)
     179             : {
     180             :     int         nbuffers;
     181             : 
     182             :     /*
     183             :      * This must be called before entering a critical section, because
     184             :      * allocating memory inside a critical section can fail. repalloc() will
     185             :      * check the same, but better to check it here too so that we fail
     186             :      * consistently even if the arrays happen to be large enough already.
     187             :      */
     188             :     Assert(CritSectionCount == 0);
     189             : 
     190             :     /* the minimum values can't be decreased */
     191      135528 :     if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
     192        4110 :         max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
     193      135528 :     if (ndatas < XLR_NORMAL_RDATAS)
     194      135476 :         ndatas = XLR_NORMAL_RDATAS;
     195             : 
     196      135528 :     if (max_block_id > XLR_MAX_BLOCK_ID)
     197           0 :         elog(ERROR, "maximum number of WAL record block references exceeded");
     198      135528 :     nbuffers = max_block_id + 1;
     199             : 
     200      135528 :     if (nbuffers > max_registered_buffers)
     201             :     {
     202        3460 :         registered_buffers = (registered_buffer *)
     203        3460 :             repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
     204             : 
     205             :         /*
     206             :          * At least the padding bytes in the structs must be zeroed, because
     207             :          * they are included in WAL data, but initialize it all for tidiness.
     208             :          */
     209        3460 :         MemSet(&registered_buffers[max_registered_buffers], 0,
     210             :                (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
     211        3460 :         max_registered_buffers = nbuffers;
     212             :     }
     213             : 
     214      135528 :     if (ndatas > max_rdatas)
     215             :     {
     216          32 :         rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
     217          32 :         max_rdatas = ndatas;
     218             :     }
     219      135528 : }
     220             : 
     221             : /*
     222             :  * Reset WAL record construction buffers.
     223             :  */
     224             : void
     225    30552726 : XLogResetInsertion(void)
     226             : {
     227             :     int         i;
     228             : 
     229    60771566 :     for (i = 0; i < max_registered_block_id; i++)
     230    30218840 :         registered_buffers[i].in_use = false;
     231             : 
     232    30552726 :     num_rdatas = 0;
     233    30552726 :     max_registered_block_id = 0;
     234    30552726 :     mainrdata_len = 0;
     235    30552726 :     mainrdata_last = (XLogRecData *) &mainrdata_head;
     236    30552726 :     curinsert_flags = 0;
     237    30552726 :     begininsert_called = false;
     238    30552726 : }
     239             : 
     240             : /*
     241             :  * Register a reference to a buffer with the WAL record being constructed.
     242             :  * This must be called for every page that the WAL-logged operation modifies.
     243             :  */
     244             : void
     245    29627892 : XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
     246             : {
     247             :     registered_buffer *regbuf;
     248             : 
     249             :     /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
     250             :     Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
     251             :     Assert(begininsert_called);
     252             : 
     253             :     /*
     254             :      * Ordinarily, buffer should be exclusive-locked and marked dirty before
     255             :      * we get here, otherwise we could end up violating one of the rules in
     256             :      * access/transam/README.
     257             :      *
     258             :      * Some callers intentionally register a clean page and never update that
     259             :      * page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
     260             :      * bypass these checks.
     261             :      */
     262             : #ifdef USE_ASSERT_CHECKING
     263             :     if (!(flags & REGBUF_NO_CHANGE))
     264             :         Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE) &&
     265             :                BufferIsDirty(buffer));
     266             : #endif
     267             : 
     268    29627892 :     if (block_id >= max_registered_block_id)
     269             :     {
     270    28913312 :         if (block_id >= max_registered_buffers)
     271           0 :             elog(ERROR, "too many registered buffers");
     272    28913312 :         max_registered_block_id = block_id + 1;
     273             :     }
     274             : 
     275    29627892 :     regbuf = &registered_buffers[block_id];
     276             : 
     277    29627892 :     BufferGetTag(buffer, &regbuf->rlocator, &regbuf->forkno, &regbuf->block);
     278    29627892 :     regbuf->page = BufferGetPage(buffer);
     279    29627892 :     regbuf->flags = flags;
     280    29627892 :     regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
     281    29627892 :     regbuf->rdata_len = 0;
     282             : 
     283             :     /*
     284             :      * Check that this page hasn't already been registered with some other
     285             :      * block_id.
     286             :      */
     287             : #ifdef USE_ASSERT_CHECKING
     288             :     {
     289             :         int         i;
     290             : 
     291             :         for (i = 0; i < max_registered_block_id; i++)
     292             :         {
     293             :             registered_buffer *regbuf_old = &registered_buffers[i];
     294             : 
     295             :             if (i == block_id || !regbuf_old->in_use)
     296             :                 continue;
     297             : 
     298             :             Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
     299             :                    regbuf_old->forkno != regbuf->forkno ||
     300             :                    regbuf_old->block != regbuf->block);
     301             :         }
     302             :     }
     303             : #endif
     304             : 
     305    29627892 :     regbuf->in_use = true;
     306    29627892 : }
     307             : 
     308             : /*
     309             :  * Like XLogRegisterBuffer, but for registering a block that's not in the
     310             :  * shared buffer pool (i.e. when you don't have a Buffer for it).
     311             :  */
     312             : void
     313      568860 : XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
     314             :                   BlockNumber blknum, const PageData *page, uint8 flags)
     315             : {
     316             :     registered_buffer *regbuf;
     317             : 
     318             :     Assert(begininsert_called);
     319             : 
     320      568860 :     if (block_id >= max_registered_block_id)
     321      568860 :         max_registered_block_id = block_id + 1;
     322             : 
     323      568860 :     if (block_id >= max_registered_buffers)
     324           0 :         elog(ERROR, "too many registered buffers");
     325             : 
     326      568860 :     regbuf = &registered_buffers[block_id];
     327             : 
     328      568860 :     regbuf->rlocator = *rlocator;
     329      568860 :     regbuf->forkno = forknum;
     330      568860 :     regbuf->block = blknum;
     331      568860 :     regbuf->page = page;
     332      568860 :     regbuf->flags = flags;
     333      568860 :     regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
     334      568860 :     regbuf->rdata_len = 0;
     335             : 
     336             :     /*
     337             :      * Check that this page hasn't already been registered with some other
     338             :      * block_id.
     339             :      */
     340             : #ifdef USE_ASSERT_CHECKING
     341             :     {
     342             :         int         i;
     343             : 
     344             :         for (i = 0; i < max_registered_block_id; i++)
     345             :         {
     346             :             registered_buffer *regbuf_old = &registered_buffers[i];
     347             : 
     348             :             if (i == block_id || !regbuf_old->in_use)
     349             :                 continue;
     350             : 
     351             :             Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
     352             :                    regbuf_old->forkno != regbuf->forkno ||
     353             :                    regbuf_old->block != regbuf->block);
     354             :         }
     355             :     }
     356             : #endif
     357             : 
     358      568860 :     regbuf->in_use = true;
     359      568860 : }
     360             : 
     361             : /*
     362             :  * Add data to the WAL record that's being constructed.
     363             :  *
     364             :  * The data is appended to the "main chunk", available at replay with
     365             :  * XLogRecGetData().
     366             :  */
     367             : void
     368    31467294 : XLogRegisterData(const void *data, uint32 len)
     369             : {
     370             :     XLogRecData *rdata;
     371             : 
     372             :     Assert(begininsert_called);
     373             : 
     374    31467294 :     if (num_rdatas >= max_rdatas)
     375           0 :         ereport(ERROR,
     376             :                 (errmsg_internal("too much WAL data"),
     377             :                  errdetail_internal("%d out of %d data segments are already in use.",
     378             :                                     num_rdatas, max_rdatas)));
     379    31467294 :     rdata = &rdatas[num_rdatas++];
     380             : 
     381    31467294 :     rdata->data = data;
     382    31467294 :     rdata->len = len;
     383             : 
     384             :     /*
     385             :      * we use the mainrdata_last pointer to track the end of the chain, so no
     386             :      * need to clear 'next' here.
     387             :      */
     388             : 
     389    31467294 :     mainrdata_last->next = rdata;
     390    31467294 :     mainrdata_last = rdata;
     391             : 
     392    31467294 :     mainrdata_len += len;
     393    31467294 : }
     394             : 
     395             : /*
     396             :  * Add buffer-specific data to the WAL record that's being constructed.
     397             :  *
     398             :  * Block_id must reference a block previously registered with
     399             :  * XLogRegisterBuffer(). If this is called more than once for the same
     400             :  * block_id, the data is appended.
     401             :  *
     402             :  * The maximum amount of data that can be registered per block is 65535
     403             :  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
     404             :  * reconstruct the changes to the page, you might as well just log a full
     405             :  * copy of it. (the "main data" that's not associated with a block is not
     406             :  * limited)
     407             :  */
     408             : void
     409    41360064 : XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
     410             : {
     411             :     registered_buffer *regbuf;
     412             :     XLogRecData *rdata;
     413             : 
     414             :     Assert(begininsert_called);
     415             : 
     416             :     /* find the registered buffer struct */
     417    41360064 :     regbuf = &registered_buffers[block_id];
     418    41360064 :     if (!regbuf->in_use)
     419           0 :         elog(ERROR, "no block with id %d registered with WAL insertion",
     420             :              block_id);
     421             : 
     422             :     /*
     423             :      * Check against max_rdatas and ensure we do not register more data per
     424             :      * buffer than can be handled by the physical data format; i.e. that
     425             :      * regbuf->rdata_len does not grow beyond what
     426             :      * XLogRecordBlockHeader->data_length can hold.
     427             :      */
     428    41360064 :     if (num_rdatas >= max_rdatas)
     429           0 :         ereport(ERROR,
     430             :                 (errmsg_internal("too much WAL data"),
     431             :                  errdetail_internal("%d out of %d data segments are already in use.",
     432             :                                     num_rdatas, max_rdatas)));
     433    41360064 :     if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX)
     434           0 :         ereport(ERROR,
     435             :                 (errmsg_internal("too much WAL data"),
     436             :                  errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.",
     437             :                                     UINT16_MAX, block_id, regbuf->rdata_len, len)));
     438             : 
     439    41360064 :     rdata = &rdatas[num_rdatas++];
     440             : 
     441    41360064 :     rdata->data = data;
     442    41360064 :     rdata->len = len;
     443             : 
     444    41360064 :     regbuf->rdata_tail->next = rdata;
     445    41360064 :     regbuf->rdata_tail = rdata;
     446    41360064 :     regbuf->rdata_len += len;
     447    41360064 : }
     448             : 
     449             : /*
     450             :  * Set insert status flags for the upcoming WAL record.
     451             :  *
     452             :  * The flags that can be used here are:
     453             :  * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
     454             :  *   included in the record.
     455             :  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
     456             :  *   durability, which allows to avoid triggering WAL archiving and other
     457             :  *   background activity.
     458             :  */
     459             : void
     460    18868204 : XLogSetRecordFlags(uint8 flags)
     461             : {
     462             :     Assert(begininsert_called);
     463    18868204 :     curinsert_flags |= flags;
     464    18868204 : }
     465             : 
     466             : /*
     467             :  * Insert an XLOG record having the specified RMID and info bytes, with the
     468             :  * body of the record being the data and buffer references registered earlier
     469             :  * with XLogRegister* calls.
     470             :  *
     471             :  * Returns XLOG pointer to end of record (beginning of next record).
     472             :  * This can be used as LSN for data pages affected by the logged action.
     473             :  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
     474             :  * before the data page can be written out.  This implements the basic
     475             :  * WAL rule "write the log before the data".)
     476             :  */
     477             : XLogRecPtr
     478    30492376 : XLogInsert(RmgrId rmid, uint8 info)
     479             : {
     480             :     XLogRecPtr  EndPos;
     481             : 
     482             :     /* XLogBeginInsert() must have been called. */
     483    30492376 :     if (!begininsert_called)
     484           0 :         elog(ERROR, "XLogBeginInsert was not called");
     485             : 
     486             :     /*
     487             :      * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
     488             :      * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
     489             :      */
     490    30492376 :     if ((info & ~(XLR_RMGR_INFO_MASK |
     491             :                   XLR_SPECIAL_REL_UPDATE |
     492             :                   XLR_CHECK_CONSISTENCY)) != 0)
     493           0 :         elog(PANIC, "invalid xlog info mask %02X", info);
     494             : 
     495             :     TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
     496             : 
     497             :     /*
     498             :      * In bootstrap mode, we don't actually log anything but XLOG resources;
     499             :      * return a phony record pointer.
     500             :      */
     501    30492376 :     if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
     502             :     {
     503     1254000 :         XLogResetInsertion();
     504     1254000 :         EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
     505     1254000 :         return EndPos;
     506             :     }
     507             : 
     508             :     do
     509             :     {
     510             :         XLogRecPtr  RedoRecPtr;
     511             :         bool        doPageWrites;
     512    29253976 :         bool        topxid_included = false;
     513             :         XLogRecPtr  fpw_lsn;
     514             :         XLogRecData *rdt;
     515    29253976 :         int         num_fpi = 0;
     516    29253976 :         uint64      fpi_bytes = 0;
     517             : 
     518             :         /*
     519             :          * Get values needed to decide whether to do full-page writes. Since
     520             :          * we don't yet have an insertion lock, these could change under us,
     521             :          * but XLogInsertRecord will recheck them once it has a lock.
     522             :          */
     523    29253976 :         GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
     524             : 
     525    29253976 :         rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
     526             :                                  &fpw_lsn, &num_fpi, &fpi_bytes,
     527             :                                  &topxid_included);
     528             : 
     529    29253976 :         EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
     530             :                                   fpi_bytes, topxid_included);
     531    29253976 :     } while (!XLogRecPtrIsValid(EndPos));
     532             : 
     533    29238376 :     XLogResetInsertion();
     534             : 
     535    29238376 :     return EndPos;
     536             : }
     537             : 
     538             : /*
     539             :  * Simple wrapper to XLogInsert to insert a WAL record with elementary
     540             :  * contents (only an int64 is supported as value currently).
     541             :  */
     542             : XLogRecPtr
     543      864042 : XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
     544             : {
     545      864042 :     XLogBeginInsert();
     546      864042 :     XLogRegisterData(&value, sizeof(value));
     547      864042 :     return XLogInsert(rmid, info);
     548             : }
     549             : 
     550             : /*
     551             :  * Assemble a WAL record from the registered data and buffers into an
     552             :  * XLogRecData chain, ready for insertion with XLogInsertRecord().
     553             :  *
     554             :  * The record header fields are filled in, except for the xl_prev field. The
     555             :  * calculated CRC does not include the record header yet.
     556             :  *
     557             :  * If there are any registered buffers, and a full-page image was not taken
     558             :  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
     559             :  * signals that the assembled record is only good for insertion on the
     560             :  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
     561             :  *
     562             :  * *topxid_included is set if the topmost transaction ID is logged with the
     563             :  * current subtransaction.
     564             :  */
     565             : static XLogRecData *
     566    29253976 : XLogRecordAssemble(RmgrId rmid, uint8 info,
     567             :                    XLogRecPtr RedoRecPtr, bool doPageWrites,
     568             :                    XLogRecPtr *fpw_lsn, int *num_fpi, uint64 *fpi_bytes,
     569             :                    bool *topxid_included)
     570             : {
     571             :     XLogRecData *rdt;
     572    29253976 :     uint64      total_len = 0;
     573             :     int         block_id;
     574             :     pg_crc32c   rdata_crc;
     575    29253976 :     registered_buffer *prev_regbuf = NULL;
     576             :     XLogRecData *rdt_datas_last;
     577             :     XLogRecord *rechdr;
     578    29253976 :     char       *scratch = hdr_scratch;
     579             : 
     580             :     /*
     581             :      * Note: this function can be called multiple times for the same record.
     582             :      * All the modifications we do to the rdata chains below must handle that.
     583             :      */
     584             : 
     585             :     /* The record begins with the fixed-size header */
     586    29253976 :     rechdr = (XLogRecord *) scratch;
     587    29253976 :     scratch += SizeOfXLogRecord;
     588             : 
     589    29253976 :     hdr_rdt.next = NULL;
     590    29253976 :     rdt_datas_last = &hdr_rdt;
     591    29253976 :     hdr_rdt.data = hdr_scratch;
     592             : 
     593             :     /*
     594             :      * Enforce consistency checks for this record if user is looking for it.
     595             :      * Do this before at the beginning of this routine to give the possibility
     596             :      * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
     597             :      * a record.
     598             :      */
     599    29253976 :     if (wal_consistency_checking[rmid])
     600     4339636 :         info |= XLR_CHECK_CONSISTENCY;
     601             : 
     602             :     /*
     603             :      * Make an rdata chain containing all the data portions of all block
     604             :      * references. This includes the data for full-page images. Also append
     605             :      * the headers for the block references in the scratch buffer.
     606             :      */
     607    29253976 :     *fpw_lsn = InvalidXLogRecPtr;
     608    58279198 :     for (block_id = 0; block_id < max_registered_block_id; block_id++)
     609             :     {
     610    29025222 :         registered_buffer *regbuf = &registered_buffers[block_id];
     611             :         bool        needs_backup;
     612             :         bool        needs_data;
     613             :         XLogRecordBlockHeader bkpb;
     614             :         XLogRecordBlockImageHeader bimg;
     615    29025222 :         XLogRecordBlockCompressHeader cbimg = {0};
     616             :         bool        samerel;
     617    29025222 :         bool        is_compressed = false;
     618             :         bool        include_image;
     619             : 
     620    29025222 :         if (!regbuf->in_use)
     621       22088 :             continue;
     622             : 
     623             :         /* Determine if this block needs to be backed up */
     624    29003134 :         if (regbuf->flags & REGBUF_FORCE_IMAGE)
     625      607316 :             needs_backup = true;
     626    28395818 :         else if (regbuf->flags & REGBUF_NO_IMAGE)
     627      428154 :             needs_backup = false;
     628    27967664 :         else if (!doPageWrites)
     629      541244 :             needs_backup = false;
     630             :         else
     631             :         {
     632             :             /*
     633             :              * We assume page LSN is first data on *every* page that can be
     634             :              * passed to XLogInsert, whether it has the standard page layout
     635             :              * or not.
     636             :              */
     637    27426420 :             XLogRecPtr  page_lsn = PageGetLSN(regbuf->page);
     638             : 
     639    27426420 :             needs_backup = (page_lsn <= RedoRecPtr);
     640    27426420 :             if (!needs_backup)
     641             :             {
     642    27241070 :                 if (!XLogRecPtrIsValid(*fpw_lsn) || page_lsn < *fpw_lsn)
     643    26418808 :                     *fpw_lsn = page_lsn;
     644             :             }
     645             :         }
     646             : 
     647             :         /* Determine if the buffer data needs to included */
     648    29003134 :         if (regbuf->rdata_len == 0)
     649     5472720 :             needs_data = false;
     650    23530414 :         else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
     651      596004 :             needs_data = true;
     652             :         else
     653    22934410 :             needs_data = !needs_backup;
     654             : 
     655    29003134 :         bkpb.id = block_id;
     656    29003134 :         bkpb.fork_flags = regbuf->forkno;
     657    29003134 :         bkpb.data_length = 0;
     658             : 
     659    29003134 :         if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
     660      421754 :             bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
     661             : 
     662             :         /*
     663             :          * If needs_backup is true or WAL checking is enabled for current
     664             :          * resource manager, log a full-page write for the current block.
     665             :          */
     666    29003134 :         include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
     667             : 
     668    29003134 :         if (include_image)
     669             :         {
     670     5408512 :             const PageData *page = regbuf->page;
     671     5408512 :             uint16      compressed_len = 0;
     672             : 
     673             :             /*
     674             :              * The page needs to be backed up, so calculate its hole length
     675             :              * and offset.
     676             :              */
     677     5408512 :             if (regbuf->flags & REGBUF_STANDARD)
     678             :             {
     679             :                 /* Assume we can omit data between pd_lower and pd_upper */
     680     5117032 :                 uint16      lower = ((PageHeader) page)->pd_lower;
     681     5117032 :                 uint16      upper = ((PageHeader) page)->pd_upper;
     682             : 
     683     5117032 :                 if (lower >= SizeOfPageHeaderData &&
     684     5112060 :                     upper > lower &&
     685             :                     upper <= BLCKSZ)
     686             :                 {
     687     5112060 :                     bimg.hole_offset = lower;
     688     5112060 :                     cbimg.hole_length = upper - lower;
     689             :                 }
     690             :                 else
     691             :                 {
     692             :                     /* No "hole" to remove */
     693        4972 :                     bimg.hole_offset = 0;
     694        4972 :                     cbimg.hole_length = 0;
     695             :                 }
     696             :             }
     697             :             else
     698             :             {
     699             :                 /* Not a standard page header, don't try to eliminate "hole" */
     700      291480 :                 bimg.hole_offset = 0;
     701      291480 :                 cbimg.hole_length = 0;
     702             :             }
     703             : 
     704             :             /*
     705             :              * Try to compress a block image if wal_compression is enabled
     706             :              */
     707     5408512 :             if (wal_compression != WAL_COMPRESSION_NONE)
     708             :             {
     709             :                 is_compressed =
     710           0 :                     XLogCompressBackupBlock(page, bimg.hole_offset,
     711           0 :                                             cbimg.hole_length,
     712           0 :                                             regbuf->compressed_page,
     713             :                                             &compressed_len);
     714             :             }
     715             : 
     716             :             /*
     717             :              * Fill in the remaining fields in the XLogRecordBlockHeader
     718             :              * struct
     719             :              */
     720     5408512 :             bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
     721             : 
     722             :             /* Report a full page image constructed for the WAL record */
     723     5408512 :             *num_fpi += 1;
     724             : 
     725             :             /*
     726             :              * Construct XLogRecData entries for the page content.
     727             :              */
     728     5408512 :             rdt_datas_last->next = &regbuf->bkp_rdatas[0];
     729     5408512 :             rdt_datas_last = rdt_datas_last->next;
     730             : 
     731     5408512 :             bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
     732             : 
     733             :             /*
     734             :              * If WAL consistency checking is enabled for the resource manager
     735             :              * of this WAL record, a full-page image is included in the record
     736             :              * for the block modified. During redo, the full-page is replayed
     737             :              * only if BKPIMAGE_APPLY is set.
     738             :              */
     739     5408512 :             if (needs_backup)
     740      792666 :                 bimg.bimg_info |= BKPIMAGE_APPLY;
     741             : 
     742     5408512 :             if (is_compressed)
     743             :             {
     744             :                 /* The current compression is stored in the WAL record */
     745           0 :                 bimg.length = compressed_len;
     746             : 
     747             :                 /* Set the compression method used for this block */
     748           0 :                 switch ((WalCompression) wal_compression)
     749             :                 {
     750           0 :                     case WAL_COMPRESSION_PGLZ:
     751           0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
     752           0 :                         break;
     753             : 
     754           0 :                     case WAL_COMPRESSION_LZ4:
     755             : #ifdef USE_LZ4
     756           0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
     757             : #else
     758             :                         elog(ERROR, "LZ4 is not supported by this build");
     759             : #endif
     760           0 :                         break;
     761             : 
     762           0 :                     case WAL_COMPRESSION_ZSTD:
     763             : #ifdef USE_ZSTD
     764             :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
     765             : #else
     766           0 :                         elog(ERROR, "zstd is not supported by this build");
     767             : #endif
     768             :                         break;
     769             : 
     770           0 :                     case WAL_COMPRESSION_NONE:
     771             :                         Assert(false);  /* cannot happen */
     772           0 :                         break;
     773             :                         /* no default case, so that compiler will warn */
     774             :                 }
     775             : 
     776           0 :                 rdt_datas_last->data = regbuf->compressed_page;
     777           0 :                 rdt_datas_last->len = compressed_len;
     778             :             }
     779             :             else
     780             :             {
     781     5408512 :                 bimg.length = BLCKSZ - cbimg.hole_length;
     782             : 
     783     5408512 :                 if (cbimg.hole_length == 0)
     784             :                 {
     785      296452 :                     rdt_datas_last->data = page;
     786      296452 :                     rdt_datas_last->len = BLCKSZ;
     787             :                 }
     788             :                 else
     789             :                 {
     790             :                     /* must skip the hole */
     791     5112060 :                     rdt_datas_last->data = page;
     792     5112060 :                     rdt_datas_last->len = bimg.hole_offset;
     793             : 
     794     5112060 :                     rdt_datas_last->next = &regbuf->bkp_rdatas[1];
     795     5112060 :                     rdt_datas_last = rdt_datas_last->next;
     796             : 
     797     5112060 :                     rdt_datas_last->data =
     798     5112060 :                         page + (bimg.hole_offset + cbimg.hole_length);
     799     5112060 :                     rdt_datas_last->len =
     800     5112060 :                         BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
     801             :                 }
     802             :             }
     803             : 
     804     5408512 :             total_len += bimg.length;
     805             : 
     806             :             /* Track the WAL full page images in bytes */
     807     5408512 :             *fpi_bytes += bimg.length;
     808             :         }
     809             : 
     810    29003134 :         if (needs_data)
     811             :         {
     812             :             /*
     813             :              * When copying to XLogRecordBlockHeader, the length is narrowed
     814             :              * to an uint16.  Double-check that it is still correct.
     815             :              */
     816             :             Assert(regbuf->rdata_len <= UINT16_MAX);
     817             : 
     818             :             /*
     819             :              * Link the caller-supplied rdata chain for this buffer to the
     820             :              * overall list.
     821             :              */
     822    23448344 :             bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
     823    23448344 :             bkpb.data_length = (uint16) regbuf->rdata_len;
     824    23448344 :             total_len += regbuf->rdata_len;
     825             : 
     826    23448344 :             rdt_datas_last->next = regbuf->rdata_head;
     827    23448344 :             rdt_datas_last = regbuf->rdata_tail;
     828             :         }
     829             : 
     830    29003134 :         if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
     831             :         {
     832     1394602 :             samerel = true;
     833     1394602 :             bkpb.fork_flags |= BKPBLOCK_SAME_REL;
     834             :         }
     835             :         else
     836    27608532 :             samerel = false;
     837    29003134 :         prev_regbuf = regbuf;
     838             : 
     839             :         /* Ok, copy the header to the scratch buffer */
     840    29003134 :         memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
     841    29003134 :         scratch += SizeOfXLogRecordBlockHeader;
     842    29003134 :         if (include_image)
     843             :         {
     844     5408512 :             memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
     845     5408512 :             scratch += SizeOfXLogRecordBlockImageHeader;
     846     5408512 :             if (cbimg.hole_length != 0 && is_compressed)
     847             :             {
     848           0 :                 memcpy(scratch, &cbimg,
     849             :                        SizeOfXLogRecordBlockCompressHeader);
     850           0 :                 scratch += SizeOfXLogRecordBlockCompressHeader;
     851             :             }
     852             :         }
     853    29003134 :         if (!samerel)
     854             :         {
     855    27608532 :             memcpy(scratch, &regbuf->rlocator, sizeof(RelFileLocator));
     856    27608532 :             scratch += sizeof(RelFileLocator);
     857             :         }
     858    29003134 :         memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
     859    29003134 :         scratch += sizeof(BlockNumber);
     860             :     }
     861             : 
     862             :     /* followed by the record's origin, if any */
     863    29253976 :     if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
     864    17492444 :         replorigin_session_origin != InvalidRepOriginId)
     865             :     {
     866      301170 :         *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
     867      301170 :         memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
     868      301170 :         scratch += sizeof(replorigin_session_origin);
     869             :     }
     870             : 
     871             :     /* followed by toplevel XID, if not already included in previous record */
     872    29253976 :     if (IsSubxactTopXidLogPending())
     873             :     {
     874         442 :         TransactionId xid = GetTopTransactionIdIfAny();
     875             : 
     876             :         /* Set the flag that the top xid is included in the WAL */
     877         442 :         *topxid_included = true;
     878             : 
     879         442 :         *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
     880         442 :         memcpy(scratch, &xid, sizeof(TransactionId));
     881         442 :         scratch += sizeof(TransactionId);
     882             :     }
     883             : 
     884             :     /* followed by main data, if any */
     885    29253976 :     if (mainrdata_len > 0)
     886             :     {
     887    28602272 :         if (mainrdata_len > 255)
     888             :         {
     889             :             uint32      mainrdata_len_4b;
     890             : 
     891       62240 :             if (mainrdata_len > PG_UINT32_MAX)
     892           0 :                 ereport(ERROR,
     893             :                         (errmsg_internal("too much WAL data"),
     894             :                          errdetail_internal("Main data length is %" PRIu64 " bytes for a maximum of %u bytes.",
     895             :                                             mainrdata_len,
     896             :                                             PG_UINT32_MAX)));
     897             : 
     898       62240 :             mainrdata_len_4b = (uint32) mainrdata_len;
     899       62240 :             *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
     900       62240 :             memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
     901       62240 :             scratch += sizeof(uint32);
     902             :         }
     903             :         else
     904             :         {
     905    28540032 :             *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
     906    28540032 :             *(scratch++) = (uint8) mainrdata_len;
     907             :         }
     908    28602272 :         rdt_datas_last->next = mainrdata_head;
     909    28602272 :         rdt_datas_last = mainrdata_last;
     910    28602272 :         total_len += mainrdata_len;
     911             :     }
     912    29253976 :     rdt_datas_last->next = NULL;
     913             : 
     914    29253976 :     hdr_rdt.len = (scratch - hdr_scratch);
     915    29253976 :     total_len += hdr_rdt.len;
     916             : 
     917             :     /*
     918             :      * Calculate CRC of the data
     919             :      *
     920             :      * Note that the record header isn't added into the CRC initially since we
     921             :      * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
     922             :      * the whole record in the order: rdata, then backup blocks, then record
     923             :      * header.
     924             :      */
     925    29253976 :     INIT_CRC32C(rdata_crc);
     926    29253976 :     COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
     927   108898474 :     for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
     928    79644498 :         COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
     929             : 
     930             :     /*
     931             :      * Ensure that the XLogRecord is not too large.
     932             :      *
     933             :      * XLogReader machinery is only able to handle records up to a certain
     934             :      * size (ignoring machine resource limitations), so make sure that we will
     935             :      * not emit records larger than the sizes advertised to be supported.
     936             :      */
     937    29253976 :     if (total_len > XLogRecordMaxSize)
     938           0 :         ereport(ERROR,
     939             :                 (errmsg_internal("oversized WAL record"),
     940             :                  errdetail_internal("WAL record would be %" PRIu64 " bytes (of maximum %u bytes); rmid %u flags %u.",
     941             :                                     total_len, XLogRecordMaxSize, rmid, info)));
     942             : 
     943             :     /*
     944             :      * Fill in the fields in the record header. Prev-link is filled in later,
     945             :      * once we know where in the WAL the record will be inserted. The CRC does
     946             :      * not include the record header yet.
     947             :      */
     948    29253976 :     rechdr->xl_xid = GetCurrentTransactionIdIfAny();
     949    29253976 :     rechdr->xl_tot_len = (uint32) total_len;
     950    29253976 :     rechdr->xl_info = info;
     951    29253976 :     rechdr->xl_rmid = rmid;
     952    29253976 :     rechdr->xl_prev = InvalidXLogRecPtr;
     953    29253976 :     rechdr->xl_crc = rdata_crc;
     954             : 
     955    29253976 :     return &hdr_rdt;
     956             : }
     957             : 
     958             : /*
     959             :  * Create a compressed version of a backup block image.
     960             :  *
     961             :  * Returns false if compression fails (i.e., compressed result is actually
     962             :  * bigger than original). Otherwise, returns true and sets 'dlen' to
     963             :  * the length of compressed block image.
     964             :  */
     965             : static bool
     966           0 : XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
     967             :                         void *dest, uint16 *dlen)
     968             : {
     969           0 :     int32       orig_len = BLCKSZ - hole_length;
     970           0 :     int32       len = -1;
     971           0 :     int32       extra_bytes = 0;
     972             :     const void *source;
     973             :     PGAlignedBlock tmp;
     974             : 
     975           0 :     if (hole_length != 0)
     976             :     {
     977             :         /* must skip the hole */
     978           0 :         memcpy(tmp.data, page, hole_offset);
     979           0 :         memcpy(tmp.data + hole_offset,
     980           0 :                page + (hole_offset + hole_length),
     981           0 :                BLCKSZ - (hole_length + hole_offset));
     982           0 :         source = tmp.data;
     983             : 
     984             :         /*
     985             :          * Extra data needs to be stored in WAL record for the compressed
     986             :          * version of block image if the hole exists.
     987             :          */
     988           0 :         extra_bytes = SizeOfXLogRecordBlockCompressHeader;
     989             :     }
     990             :     else
     991           0 :         source = page;
     992             : 
     993           0 :     switch ((WalCompression) wal_compression)
     994             :     {
     995           0 :         case WAL_COMPRESSION_PGLZ:
     996           0 :             len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
     997           0 :             break;
     998             : 
     999           0 :         case WAL_COMPRESSION_LZ4:
    1000             : #ifdef USE_LZ4
    1001           0 :             len = LZ4_compress_default(source, dest, orig_len,
    1002             :                                        COMPRESS_BUFSIZE);
    1003           0 :             if (len <= 0)
    1004           0 :                 len = -1;       /* failure */
    1005             : #else
    1006             :             elog(ERROR, "LZ4 is not supported by this build");
    1007             : #endif
    1008           0 :             break;
    1009             : 
    1010           0 :         case WAL_COMPRESSION_ZSTD:
    1011             : #ifdef USE_ZSTD
    1012             :             len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
    1013             :                                 ZSTD_CLEVEL_DEFAULT);
    1014             :             if (ZSTD_isError(len))
    1015             :                 len = -1;       /* failure */
    1016             : #else
    1017           0 :             elog(ERROR, "zstd is not supported by this build");
    1018             : #endif
    1019             :             break;
    1020             : 
    1021           0 :         case WAL_COMPRESSION_NONE:
    1022             :             Assert(false);      /* cannot happen */
    1023           0 :             break;
    1024             :             /* no default case, so that compiler will warn */
    1025             :     }
    1026             : 
    1027             :     /*
    1028             :      * We recheck the actual size even if compression reports success and see
    1029             :      * if the number of bytes saved by compression is larger than the length
    1030             :      * of extra data needed for the compressed version of block image.
    1031             :      */
    1032           0 :     if (len >= 0 &&
    1033           0 :         len + extra_bytes < orig_len)
    1034             :     {
    1035           0 :         *dlen = (uint16) len;   /* successful compression */
    1036           0 :         return true;
    1037             :     }
    1038           0 :     return false;
    1039             : }
    1040             : 
    1041             : /*
    1042             :  * Determine whether the buffer referenced has to be backed up.
    1043             :  *
    1044             :  * Since we don't yet have the insert lock, fullPageWrites and runningBackups
    1045             :  * (which forces full-page writes) could change later, so the result should
    1046             :  * be used for optimization purposes only.
    1047             :  */
    1048             : bool
    1049      283504 : XLogCheckBufferNeedsBackup(Buffer buffer)
    1050             : {
    1051             :     XLogRecPtr  RedoRecPtr;
    1052             :     bool        doPageWrites;
    1053             :     Page        page;
    1054             : 
    1055      283504 :     GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
    1056             : 
    1057      283504 :     page = BufferGetPage(buffer);
    1058             : 
    1059      283504 :     if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
    1060        1916 :         return true;            /* buffer requires backup */
    1061             : 
    1062      281588 :     return false;               /* buffer does not need to be backed up */
    1063             : }
    1064             : 
    1065             : /*
    1066             :  * Write a backup block if needed when we are setting a hint. Note that
    1067             :  * this may be called for a variety of page types, not just heaps.
    1068             :  *
    1069             :  * Callable while holding just share lock on the buffer content.
    1070             :  *
    1071             :  * We can't use the plain backup block mechanism since that relies on the
    1072             :  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
    1073             :  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
    1074             :  * failures. So instead we copy the page and insert the copied data as normal
    1075             :  * record data.
    1076             :  *
    1077             :  * We only need to do something if page has not yet been full page written in
    1078             :  * this checkpoint round. The LSN of the inserted wal record is returned if we
    1079             :  * had to write, InvalidXLogRecPtr otherwise.
    1080             :  *
    1081             :  * It is possible that multiple concurrent backends could attempt to write WAL
    1082             :  * records. In that case, multiple copies of the same block would be recorded
    1083             :  * in separate WAL records by different backends, though that is still OK from
    1084             :  * a correctness perspective.
    1085             :  */
    1086             : XLogRecPtr
    1087      118538 : XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
    1088             : {
    1089      118538 :     XLogRecPtr  recptr = InvalidXLogRecPtr;
    1090             :     XLogRecPtr  lsn;
    1091             :     XLogRecPtr  RedoRecPtr;
    1092             : 
    1093             :     /*
    1094             :      * Ensure no checkpoint can change our view of RedoRecPtr.
    1095             :      */
    1096             :     Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0);
    1097             : 
    1098             :     /*
    1099             :      * Update RedoRecPtr so that we can make the right decision
    1100             :      */
    1101      118538 :     RedoRecPtr = GetRedoRecPtr();
    1102             : 
    1103             :     /*
    1104             :      * We assume page LSN is first data on *every* page that can be passed to
    1105             :      * XLogInsert, whether it has the standard page layout or not. Since we're
    1106             :      * only holding a share-lock on the page, we must take the buffer header
    1107             :      * lock when we look at the LSN.
    1108             :      */
    1109      118538 :     lsn = BufferGetLSNAtomic(buffer);
    1110             : 
    1111      118538 :     if (lsn <= RedoRecPtr)
    1112             :     {
    1113       61676 :         int         flags = 0;
    1114             :         PGAlignedBlock copied_buffer;
    1115       61676 :         char       *origdata = (char *) BufferGetBlock(buffer);
    1116             :         RelFileLocator rlocator;
    1117             :         ForkNumber  forkno;
    1118             :         BlockNumber blkno;
    1119             : 
    1120             :         /*
    1121             :          * Copy buffer so we don't have to worry about concurrent hint bit or
    1122             :          * lsn updates. We assume pd_lower/upper cannot be changed without an
    1123             :          * exclusive lock, so the contents bkp are not racy.
    1124             :          */
    1125       61676 :         if (buffer_std)
    1126             :         {
    1127             :             /* Assume we can omit data between pd_lower and pd_upper */
    1128       38822 :             Page        page = BufferGetPage(buffer);
    1129       38822 :             uint16      lower = ((PageHeader) page)->pd_lower;
    1130       38822 :             uint16      upper = ((PageHeader) page)->pd_upper;
    1131             : 
    1132       38822 :             memcpy(copied_buffer.data, origdata, lower);
    1133       38822 :             memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
    1134             :         }
    1135             :         else
    1136       22854 :             memcpy(copied_buffer.data, origdata, BLCKSZ);
    1137             : 
    1138       61676 :         XLogBeginInsert();
    1139             : 
    1140       61676 :         if (buffer_std)
    1141       38822 :             flags |= REGBUF_STANDARD;
    1142             : 
    1143       61676 :         BufferGetTag(buffer, &rlocator, &forkno, &blkno);
    1144       61676 :         XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);
    1145             : 
    1146       61676 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
    1147             :     }
    1148             : 
    1149      118538 :     return recptr;
    1150             : }
    1151             : 
    1152             : /*
    1153             :  * Write a WAL record containing a full image of a page. Caller is responsible
    1154             :  * for writing the page to disk after calling this routine.
    1155             :  *
    1156             :  * Note: If you're using this function, you should be building pages in private
    1157             :  * memory and writing them directly to smgr.  If you're using buffers, call
    1158             :  * log_newpage_buffer instead.
    1159             :  *
    1160             :  * If the page follows the standard page layout, with a PageHeader and unused
    1161             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1162             :  * the unused space to be left out from the WAL record, making it smaller.
    1163             :  */
    1164             : XLogRecPtr
    1165      267354 : log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno,
    1166             :             Page page, bool page_std)
    1167             : {
    1168             :     int         flags;
    1169             :     XLogRecPtr  recptr;
    1170             : 
    1171      267354 :     flags = REGBUF_FORCE_IMAGE;
    1172      267354 :     if (page_std)
    1173      267036 :         flags |= REGBUF_STANDARD;
    1174             : 
    1175      267354 :     XLogBeginInsert();
    1176      267354 :     XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags);
    1177      267354 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1178             : 
    1179             :     /*
    1180             :      * The page may be uninitialized. If so, we can't set the LSN because that
    1181             :      * would corrupt the page.
    1182             :      */
    1183      267354 :     if (!PageIsNew(page))
    1184             :     {
    1185      267346 :         PageSetLSN(page, recptr);
    1186             :     }
    1187             : 
    1188      267354 :     return recptr;
    1189             : }
    1190             : 
    1191             : /*
    1192             :  * Like log_newpage(), but allows logging multiple pages in one operation.
    1193             :  * It is more efficient than calling log_newpage() for each page separately,
    1194             :  * because we can write multiple pages in a single WAL record.
    1195             :  */
    1196             : void
    1197       38804 : log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages,
    1198             :              BlockNumber *blknos, Page *pages, bool page_std)
    1199             : {
    1200             :     int         flags;
    1201             :     XLogRecPtr  recptr;
    1202             :     int         i;
    1203             :     int         j;
    1204             : 
    1205       38804 :     flags = REGBUF_FORCE_IMAGE;
    1206       38804 :     if (page_std)
    1207       38716 :         flags |= REGBUF_STANDARD;
    1208             : 
    1209             :     /*
    1210             :      * Iterate over all the pages. They are collected into batches of
    1211             :      * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
    1212             :      * batch.
    1213             :      */
    1214       38804 :     XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
    1215             : 
    1216       38804 :     i = 0;
    1217       77608 :     while (i < num_pages)
    1218             :     {
    1219       38804 :         int         batch_start = i;
    1220             :         int         nbatch;
    1221             : 
    1222       38804 :         XLogBeginInsert();
    1223             : 
    1224       38804 :         nbatch = 0;
    1225      115158 :         while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
    1226             :         {
    1227       76354 :             XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags);
    1228       76354 :             i++;
    1229       76354 :             nbatch++;
    1230             :         }
    1231             : 
    1232       38804 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1233             : 
    1234      115158 :         for (j = batch_start; j < i; j++)
    1235             :         {
    1236             :             /*
    1237             :              * The page may be uninitialized. If so, we can't set the LSN
    1238             :              * because that would corrupt the page.
    1239             :              */
    1240       76354 :             if (!PageIsNew(pages[j]))
    1241             :             {
    1242       76346 :                 PageSetLSN(pages[j], recptr);
    1243             :             }
    1244             :         }
    1245             :     }
    1246       38804 : }
    1247             : 
    1248             : /*
    1249             :  * Write a WAL record containing a full image of a page.
    1250             :  *
    1251             :  * Caller should initialize the buffer and mark it dirty before calling this
    1252             :  * function.  This function will set the page LSN.
    1253             :  *
    1254             :  * If the page follows the standard page layout, with a PageHeader and unused
    1255             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1256             :  * the unused space to be left out from the WAL record, making it smaller.
    1257             :  */
    1258             : XLogRecPtr
    1259      261844 : log_newpage_buffer(Buffer buffer, bool page_std)
    1260             : {
    1261      261844 :     Page        page = BufferGetPage(buffer);
    1262             :     RelFileLocator rlocator;
    1263             :     ForkNumber  forknum;
    1264             :     BlockNumber blkno;
    1265             : 
    1266             :     /* Shared buffers should be modified in a critical section. */
    1267             :     Assert(CritSectionCount > 0);
    1268             : 
    1269      261844 :     BufferGetTag(buffer, &rlocator, &forknum, &blkno);
    1270             : 
    1271      261844 :     return log_newpage(&rlocator, forknum, blkno, page, page_std);
    1272             : }
    1273             : 
    1274             : /*
    1275             :  * WAL-log a range of blocks in a relation.
    1276             :  *
    1277             :  * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
    1278             :  * written to the WAL. If the range is large, this is done in multiple WAL
    1279             :  * records.
    1280             :  *
    1281             :  * If all page follows the standard page layout, with a PageHeader and unused
    1282             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1283             :  * the unused space to be left out from the WAL records, making them smaller.
    1284             :  *
    1285             :  * NOTE: This function acquires exclusive-locks on the pages. Typically, this
    1286             :  * is used on a newly-built relation, and the caller is holding a
    1287             :  * AccessExclusiveLock on it, so no other backend can be accessing it at the
    1288             :  * same time. If that's not the case, you must ensure that this does not
    1289             :  * cause a deadlock through some other means.
    1290             :  */
    1291             : void
    1292       92320 : log_newpage_range(Relation rel, ForkNumber forknum,
    1293             :                   BlockNumber startblk, BlockNumber endblk,
    1294             :                   bool page_std)
    1295             : {
    1296             :     int         flags;
    1297             :     BlockNumber blkno;
    1298             : 
    1299       92320 :     flags = REGBUF_FORCE_IMAGE;
    1300       92320 :     if (page_std)
    1301         736 :         flags |= REGBUF_STANDARD;
    1302             : 
    1303             :     /*
    1304             :      * Iterate over all the pages in the range. They are collected into
    1305             :      * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
    1306             :      * for each batch.
    1307             :      */
    1308       92320 :     XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
    1309             : 
    1310       92320 :     blkno = startblk;
    1311      162402 :     while (blkno < endblk)
    1312             :     {
    1313             :         Buffer      bufpack[XLR_MAX_BLOCK_ID];
    1314             :         XLogRecPtr  recptr;
    1315             :         int         nbufs;
    1316             :         int         i;
    1317             : 
    1318       70082 :         CHECK_FOR_INTERRUPTS();
    1319             : 
    1320             :         /* Collect a batch of blocks. */
    1321       70082 :         nbufs = 0;
    1322      329128 :         while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
    1323             :         {
    1324      259046 :             Buffer      buf = ReadBufferExtended(rel, forknum, blkno,
    1325             :                                                  RBM_NORMAL, NULL);
    1326             : 
    1327      259046 :             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
    1328             : 
    1329             :             /*
    1330             :              * Completely empty pages are not WAL-logged. Writing a WAL record
    1331             :              * would change the LSN, and we don't want that. We want the page
    1332             :              * to stay empty.
    1333             :              */
    1334      259046 :             if (!PageIsNew(BufferGetPage(buf)))
    1335      258086 :                 bufpack[nbufs++] = buf;
    1336             :             else
    1337         960 :                 UnlockReleaseBuffer(buf);
    1338      259046 :             blkno++;
    1339             :         }
    1340             : 
    1341             :         /* Nothing more to do if all remaining blocks were empty. */
    1342       70082 :         if (nbufs == 0)
    1343           0 :             break;
    1344             : 
    1345             :         /* Write WAL record for this batch. */
    1346       70082 :         XLogBeginInsert();
    1347             : 
    1348       70082 :         START_CRIT_SECTION();
    1349      328168 :         for (i = 0; i < nbufs; i++)
    1350             :         {
    1351      258086 :             MarkBufferDirty(bufpack[i]);
    1352      258086 :             XLogRegisterBuffer(i, bufpack[i], flags);
    1353             :         }
    1354             : 
    1355       70082 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1356             : 
    1357      328168 :         for (i = 0; i < nbufs; i++)
    1358             :         {
    1359      258086 :             PageSetLSN(BufferGetPage(bufpack[i]), recptr);
    1360      258086 :             UnlockReleaseBuffer(bufpack[i]);
    1361             :         }
    1362       70082 :         END_CRIT_SECTION();
    1363             :     }
    1364       92320 : }
    1365             : 
    1366             : /*
    1367             :  * Allocate working buffers needed for WAL record construction.
    1368             :  */
    1369             : void
    1370       44926 : InitXLogInsert(void)
    1371             : {
    1372             : #ifdef USE_ASSERT_CHECKING
    1373             : 
    1374             :     /*
    1375             :      * Check that any records assembled can be decoded.  This is capped based
    1376             :      * on what XLogReader would require at its maximum bound.  The XLOG_BLCKSZ
    1377             :      * addend covers the larger allocate_recordbuf() demand.  This code path
    1378             :      * is called once per backend, more than enough for this check.
    1379             :      */
    1380             :     size_t      max_required =
    1381             :         DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);
    1382             : 
    1383             :     Assert(AllocSizeIsValid(max_required));
    1384             : #endif
    1385             : 
    1386             :     /* Initialize the working areas */
    1387       44926 :     if (xloginsert_cxt == NULL)
    1388             :     {
    1389       44926 :         xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
    1390             :                                                "WAL record construction",
    1391             :                                                ALLOCSET_DEFAULT_SIZES);
    1392             :     }
    1393             : 
    1394       44926 :     if (registered_buffers == NULL)
    1395             :     {
    1396       44926 :         registered_buffers = (registered_buffer *)
    1397       44926 :             MemoryContextAllocZero(xloginsert_cxt,
    1398             :                                    sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
    1399       44926 :         max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
    1400             :     }
    1401       44926 :     if (rdatas == NULL)
    1402             :     {
    1403       44926 :         rdatas = MemoryContextAlloc(xloginsert_cxt,
    1404             :                                     sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
    1405       44926 :         max_rdatas = XLR_NORMAL_RDATAS;
    1406             :     }
    1407             : 
    1408             :     /*
    1409             :      * Allocate a buffer to hold the header information for a WAL record.
    1410             :      */
    1411       44926 :     if (hdr_scratch == NULL)
    1412       44926 :         hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
    1413             :                                              HEADER_SCRATCH_SIZE);
    1414       44926 : }

Generated by: LCOV version 1.16