LCOV - code coverage report
Current view: top level - src/backend/access/transam - xloginsert.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 338 400 84.5 %
Date: 2025-07-10 17:18:23 Functions: 18 19 94.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * xloginsert.c
       4             :  *      Functions for constructing WAL records
       5             :  *
       6             :  * Constructing a WAL record begins with a call to XLogBeginInsert,
       7             :  * followed by a number of XLogRegister* calls. The registered data is
       8             :  * collected in private working memory, and finally assembled into a chain
       9             :  * of XLogRecData structs by a call to XLogRecordAssemble(). See
      10             :  * access/transam/README for details.
      11             :  *
      12             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      13             :  * Portions Copyright (c) 1994, Regents of the University of California
      14             :  *
      15             :  * src/backend/access/transam/xloginsert.c
      16             :  *
      17             :  *-------------------------------------------------------------------------
      18             :  */
      19             : 
      20             : #include "postgres.h"
      21             : 
      22             : #ifdef USE_LZ4
      23             : #include <lz4.h>
      24             : #endif
      25             : 
      26             : #ifdef USE_ZSTD
      27             : #include <zstd.h>
      28             : #endif
      29             : 
      30             : #include "access/xact.h"
      31             : #include "access/xlog.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xloginsert.h"
      34             : #include "catalog/pg_control.h"
      35             : #include "common/pg_lzcompress.h"
      36             : #include "miscadmin.h"
      37             : #include "pg_trace.h"
      38             : #include "replication/origin.h"
      39             : #include "storage/bufmgr.h"
      40             : #include "storage/proc.h"
      41             : #include "utils/memutils.h"
      42             : 
      43             : /*
      44             :  * Guess the maximum buffer size required to store a compressed version of
      45             :  * backup block image.
      46             :  */
      47             : #ifdef USE_LZ4
      48             : #define LZ4_MAX_BLCKSZ      LZ4_COMPRESSBOUND(BLCKSZ)
      49             : #else
      50             : #define LZ4_MAX_BLCKSZ      0
      51             : #endif
      52             : 
      53             : #ifdef USE_ZSTD
      54             : #define ZSTD_MAX_BLCKSZ     ZSTD_COMPRESSBOUND(BLCKSZ)
      55             : #else
      56             : #define ZSTD_MAX_BLCKSZ     0
      57             : #endif
      58             : 
      59             : #define PGLZ_MAX_BLCKSZ     PGLZ_MAX_OUTPUT(BLCKSZ)
      60             : 
      61             : /* Buffer size required to store a compressed version of backup block image */
      62             : #define COMPRESS_BUFSIZE    Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
      63             : 
      64             : /*
      65             :  * For each block reference registered with XLogRegisterBuffer, we fill in
      66             :  * a registered_buffer struct.
      67             :  */
      68             : typedef struct
      69             : {
      70             :     bool        in_use;         /* is this slot in use? */
      71             :     uint8       flags;          /* REGBUF_* flags */
      72             :     RelFileLocator rlocator;    /* identifies the relation and block */
      73             :     ForkNumber  forkno;
      74             :     BlockNumber block;
      75             :     const PageData *page;       /* page content */
      76             :     uint32      rdata_len;      /* total length of data in rdata chain */
      77             :     XLogRecData *rdata_head;    /* head of the chain of data registered with
      78             :                                  * this block */
      79             :     XLogRecData *rdata_tail;    /* last entry in the chain, or &rdata_head if
      80             :                                  * empty */
      81             : 
      82             :     XLogRecData bkp_rdatas[2];  /* temporary rdatas used to hold references to
      83             :                                  * backup block data in XLogRecordAssemble() */
      84             : 
      85             :     /* buffer to store a compressed version of backup block image */
      86             :     char        compressed_page[COMPRESS_BUFSIZE];
      87             : } registered_buffer;
      88             : 
      89             : static registered_buffer *registered_buffers;
      90             : static int  max_registered_buffers; /* allocated size */
      91             : static int  max_registered_block_id = 0;    /* highest block_id + 1 currently
      92             :                                              * registered */
      93             : 
      94             : /*
      95             :  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
      96             :  * with XLogRegisterData(...).
      97             :  */
      98             : static XLogRecData *mainrdata_head;
      99             : static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
     100             : static uint64 mainrdata_len;    /* total # of bytes in chain */
     101             : 
     102             : /* flags for the in-progress insertion */
     103             : static uint8 curinsert_flags = 0;
     104             : 
     105             : /*
     106             :  * These are used to hold the record header while constructing a record.
     107             :  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
     108             :  * because we want it to be MAXALIGNed and padding bytes zeroed.
     109             :  *
     110             :  * For simplicity, it's allocated large enough to hold the headers for any
     111             :  * WAL record.
     112             :  */
     113             : static XLogRecData hdr_rdt;
     114             : static char *hdr_scratch = NULL;
     115             : 
     116             : #define SizeOfXlogOrigin    (sizeof(RepOriginId) + sizeof(char))
     117             : #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
     118             : 
     119             : #define HEADER_SCRATCH_SIZE \
     120             :     (SizeOfXLogRecord + \
     121             :      MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
     122             :      SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
     123             :      SizeOfXLogTransactionId)
     124             : 
     125             : /*
     126             :  * An array of XLogRecData structs, to hold registered data.
     127             :  */
     128             : static XLogRecData *rdatas;
     129             : static int  num_rdatas;         /* entries currently used */
     130             : static int  max_rdatas;         /* allocated size */
     131             : 
     132             : static bool begininsert_called = false;
     133             : 
     134             : /* Memory context to hold the registered buffer and data references. */
     135             : static MemoryContext xloginsert_cxt;
     136             : 
     137             : static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
     138             :                                        XLogRecPtr RedoRecPtr, bool doPageWrites,
     139             :                                        XLogRecPtr *fpw_lsn, int *num_fpi,
     140             :                                        bool *topxid_included);
     141             : static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
     142             :                                     uint16 hole_length, void *dest, uint16 *dlen);
     143             : 
     144             : /*
     145             :  * Begin constructing a WAL record. This must be called before the
     146             :  * XLogRegister* functions and XLogInsert().
     147             :  */
     148             : void
     149    30508264 : XLogBeginInsert(void)
     150             : {
     151             :     Assert(max_registered_block_id == 0);
     152             :     Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
     153             :     Assert(mainrdata_len == 0);
     154             : 
     155             :     /* cross-check on whether we should be here or not */
     156    30508264 :     if (!XLogInsertAllowed())
     157           0 :         elog(ERROR, "cannot make new WAL entries during recovery");
     158             : 
     159    30508264 :     if (begininsert_called)
     160           0 :         elog(ERROR, "XLogBeginInsert was already called");
     161             : 
     162    30508264 :     begininsert_called = true;
     163    30508264 : }
     164             : 
     165             : /*
     166             :  * Ensure that there are enough buffer and data slots in the working area,
     167             :  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
     168             :  * calls.
     169             :  *
     170             :  * There is always space for a small number of buffers and data chunks, enough
     171             :  * for most record types. This function is for the exceptional cases that need
     172             :  * more.
     173             :  */
     174             : void
     175      140662 : XLogEnsureRecordSpace(int max_block_id, int ndatas)
     176             : {
     177             :     int         nbuffers;
     178             : 
     179             :     /*
     180             :      * This must be called before entering a critical section, because
     181             :      * allocating memory inside a critical section can fail. repalloc() will
     182             :      * check the same, but better to check it here too so that we fail
     183             :      * consistently even if the arrays happen to be large enough already.
     184             :      */
     185             :     Assert(CritSectionCount == 0);
     186             : 
     187             :     /* the minimum values can't be decreased */
     188      140662 :     if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
     189        4050 :         max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
     190      140662 :     if (ndatas < XLR_NORMAL_RDATAS)
     191      140612 :         ndatas = XLR_NORMAL_RDATAS;
     192             : 
     193      140662 :     if (max_block_id > XLR_MAX_BLOCK_ID)
     194           0 :         elog(ERROR, "maximum number of WAL record block references exceeded");
     195      140662 :     nbuffers = max_block_id + 1;
     196             : 
     197      140662 :     if (nbuffers > max_registered_buffers)
     198             :     {
     199        3390 :         registered_buffers = (registered_buffer *)
     200        3390 :             repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
     201             : 
     202             :         /*
     203             :          * At least the padding bytes in the structs must be zeroed, because
     204             :          * they are included in WAL data, but initialize it all for tidiness.
     205             :          */
     206        3390 :         MemSet(&registered_buffers[max_registered_buffers], 0,
     207             :                (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
     208        3390 :         max_registered_buffers = nbuffers;
     209             :     }
     210             : 
     211      140662 :     if (ndatas > max_rdatas)
     212             :     {
     213          30 :         rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
     214          30 :         max_rdatas = ndatas;
     215             :     }
     216      140662 : }
     217             : 
     218             : /*
     219             :  * Reset WAL record construction buffers.
     220             :  */
     221             : void
     222    30566744 : XLogResetInsertion(void)
     223             : {
     224             :     int         i;
     225             : 
     226    60836624 :     for (i = 0; i < max_registered_block_id; i++)
     227    30269880 :         registered_buffers[i].in_use = false;
     228             : 
     229    30566744 :     num_rdatas = 0;
     230    30566744 :     max_registered_block_id = 0;
     231    30566744 :     mainrdata_len = 0;
     232    30566744 :     mainrdata_last = (XLogRecData *) &mainrdata_head;
     233    30566744 :     curinsert_flags = 0;
     234    30566744 :     begininsert_called = false;
     235    30566744 : }
     236             : 
     237             : /*
     238             :  * Register a reference to a buffer with the WAL record being constructed.
     239             :  * This must be called for every page that the WAL-logged operation modifies.
     240             :  */
     241             : void
     242    29685992 : XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
     243             : {
     244             :     registered_buffer *regbuf;
     245             : 
     246             :     /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
     247             :     Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
     248             :     Assert(begininsert_called);
     249             : 
     250             :     /*
     251             :      * Ordinarily, buffer should be exclusive-locked and marked dirty before
     252             :      * we get here, otherwise we could end up violating one of the rules in
     253             :      * access/transam/README.
     254             :      *
     255             :      * Some callers intentionally register a clean page and never update that
     256             :      * page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
     257             :      * bypass these checks.
     258             :      */
     259             : #ifdef USE_ASSERT_CHECKING
     260             :     if (!(flags & REGBUF_NO_CHANGE))
     261             :         Assert(BufferIsExclusiveLocked(buffer) && BufferIsDirty(buffer));
     262             : #endif
     263             : 
     264    29685992 :     if (block_id >= max_registered_block_id)
     265             :     {
     266    28960236 :         if (block_id >= max_registered_buffers)
     267           0 :             elog(ERROR, "too many registered buffers");
     268    28960236 :         max_registered_block_id = block_id + 1;
     269             :     }
     270             : 
     271    29685992 :     regbuf = &registered_buffers[block_id];
     272             : 
     273    29685992 :     BufferGetTag(buffer, &regbuf->rlocator, &regbuf->forkno, &regbuf->block);
     274    29685992 :     regbuf->page = BufferGetPage(buffer);
     275    29685992 :     regbuf->flags = flags;
     276    29685992 :     regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
     277    29685992 :     regbuf->rdata_len = 0;
     278             : 
     279             :     /*
     280             :      * Check that this page hasn't already been registered with some other
     281             :      * block_id.
     282             :      */
     283             : #ifdef USE_ASSERT_CHECKING
     284             :     {
     285             :         int         i;
     286             : 
     287             :         for (i = 0; i < max_registered_block_id; i++)
     288             :         {
     289             :             registered_buffer *regbuf_old = &registered_buffers[i];
     290             : 
     291             :             if (i == block_id || !regbuf_old->in_use)
     292             :                 continue;
     293             : 
     294             :             Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
     295             :                    regbuf_old->forkno != regbuf->forkno ||
     296             :                    regbuf_old->block != regbuf->block);
     297             :         }
     298             :     }
     299             : #endif
     300             : 
     301    29685992 :     regbuf->in_use = true;
     302    29685992 : }
     303             : 
     304             : /*
     305             :  * Like XLogRegisterBuffer, but for registering a block that's not in the
     306             :  * shared buffer pool (i.e. when you don't have a Buffer for it).
     307             :  */
     308             : void
     309      561750 : XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
     310             :                   BlockNumber blknum, const PageData *page, uint8 flags)
     311             : {
     312             :     registered_buffer *regbuf;
     313             : 
     314             :     Assert(begininsert_called);
     315             : 
     316      561750 :     if (block_id >= max_registered_block_id)
     317      561750 :         max_registered_block_id = block_id + 1;
     318             : 
     319      561750 :     if (block_id >= max_registered_buffers)
     320           0 :         elog(ERROR, "too many registered buffers");
     321             : 
     322      561750 :     regbuf = &registered_buffers[block_id];
     323             : 
     324      561750 :     regbuf->rlocator = *rlocator;
     325      561750 :     regbuf->forkno = forknum;
     326      561750 :     regbuf->block = blknum;
     327      561750 :     regbuf->page = page;
     328      561750 :     regbuf->flags = flags;
     329      561750 :     regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
     330      561750 :     regbuf->rdata_len = 0;
     331             : 
     332             :     /*
     333             :      * Check that this page hasn't already been registered with some other
     334             :      * block_id.
     335             :      */
     336             : #ifdef USE_ASSERT_CHECKING
     337             :     {
     338             :         int         i;
     339             : 
     340             :         for (i = 0; i < max_registered_block_id; i++)
     341             :         {
     342             :             registered_buffer *regbuf_old = &registered_buffers[i];
     343             : 
     344             :             if (i == block_id || !regbuf_old->in_use)
     345             :                 continue;
     346             : 
     347             :             Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
     348             :                    regbuf_old->forkno != regbuf->forkno ||
     349             :                    regbuf_old->block != regbuf->block);
     350             :         }
     351             :     }
     352             : #endif
     353             : 
     354      561750 :     regbuf->in_use = true;
     355      561750 : }
     356             : 
     357             : /*
     358             :  * Add data to the WAL record that's being constructed.
     359             :  *
     360             :  * The data is appended to the "main chunk", available at replay with
     361             :  * XLogRecGetData().
     362             :  */
     363             : void
     364    31484718 : XLogRegisterData(const void *data, uint32 len)
     365             : {
     366             :     XLogRecData *rdata;
     367             : 
     368             :     Assert(begininsert_called);
     369             : 
     370    31484718 :     if (num_rdatas >= max_rdatas)
     371           0 :         ereport(ERROR,
     372             :                 (errmsg_internal("too much WAL data"),
     373             :                  errdetail_internal("%d out of %d data segments are already in use.",
     374             :                                     num_rdatas, max_rdatas)));
     375    31484718 :     rdata = &rdatas[num_rdatas++];
     376             : 
     377    31484718 :     rdata->data = data;
     378    31484718 :     rdata->len = len;
     379             : 
     380             :     /*
     381             :      * we use the mainrdata_last pointer to track the end of the chain, so no
     382             :      * need to clear 'next' here.
     383             :      */
     384             : 
     385    31484718 :     mainrdata_last->next = rdata;
     386    31484718 :     mainrdata_last = rdata;
     387             : 
     388    31484718 :     mainrdata_len += len;
     389    31484718 : }
     390             : 
     391             : /*
     392             :  * Add buffer-specific data to the WAL record that's being constructed.
     393             :  *
     394             :  * Block_id must reference a block previously registered with
     395             :  * XLogRegisterBuffer(). If this is called more than once for the same
     396             :  * block_id, the data is appended.
     397             :  *
     398             :  * The maximum amount of data that can be registered per block is 65535
     399             :  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
     400             :  * reconstruct the changes to the page, you might as well just log a full
     401             :  * copy of it. (the "main data" that's not associated with a block is not
     402             :  * limited)
     403             :  */
     404             : void
     405    41399762 : XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
     406             : {
     407             :     registered_buffer *regbuf;
     408             :     XLogRecData *rdata;
     409             : 
     410             :     Assert(begininsert_called);
     411             : 
     412             :     /* find the registered buffer struct */
     413    41399762 :     regbuf = &registered_buffers[block_id];
     414    41399762 :     if (!regbuf->in_use)
     415           0 :         elog(ERROR, "no block with id %d registered with WAL insertion",
     416             :              block_id);
     417             : 
     418             :     /*
     419             :      * Check against max_rdatas and ensure we do not register more data per
     420             :      * buffer than can be handled by the physical data format; i.e. that
     421             :      * regbuf->rdata_len does not grow beyond what
     422             :      * XLogRecordBlockHeader->data_length can hold.
     423             :      */
     424    41399762 :     if (num_rdatas >= max_rdatas)
     425           0 :         ereport(ERROR,
     426             :                 (errmsg_internal("too much WAL data"),
     427             :                  errdetail_internal("%d out of %d data segments are already in use.",
     428             :                                     num_rdatas, max_rdatas)));
     429    41399762 :     if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX)
     430           0 :         ereport(ERROR,
     431             :                 (errmsg_internal("too much WAL data"),
     432             :                  errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.",
     433             :                                     UINT16_MAX, block_id, regbuf->rdata_len, len)));
     434             : 
     435    41399762 :     rdata = &rdatas[num_rdatas++];
     436             : 
     437    41399762 :     rdata->data = data;
     438    41399762 :     rdata->len = len;
     439             : 
     440    41399762 :     regbuf->rdata_tail->next = rdata;
     441    41399762 :     regbuf->rdata_tail = rdata;
     442    41399762 :     regbuf->rdata_len += len;
     443    41399762 : }
     444             : 
     445             : /*
     446             :  * Set insert status flags for the upcoming WAL record.
     447             :  *
     448             :  * The flags that can be used here are:
     449             :  * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
     450             :  *   included in the record.
     451             :  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
     452             :  *   durability, which allows to avoid triggering WAL archiving and other
     453             :  *   background activity.
     454             :  */
     455             : void
     456    18801224 : XLogSetRecordFlags(uint8 flags)
     457             : {
     458             :     Assert(begininsert_called);
     459    18801224 :     curinsert_flags |= flags;
     460    18801224 : }
     461             : 
     462             : /*
     463             :  * Insert an XLOG record having the specified RMID and info bytes, with the
     464             :  * body of the record being the data and buffer references registered earlier
     465             :  * with XLogRegister* calls.
     466             :  *
     467             :  * Returns XLOG pointer to end of record (beginning of next record).
     468             :  * This can be used as LSN for data pages affected by the logged action.
     469             :  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
     470             :  * before the data page can be written out.  This implements the basic
     471             :  * WAL rule "write the log before the data".)
     472             :  */
     473             : XLogRecPtr
     474    30508264 : XLogInsert(RmgrId rmid, uint8 info)
     475             : {
     476             :     XLogRecPtr  EndPos;
     477             : 
     478             :     /* XLogBeginInsert() must have been called. */
     479    30508264 :     if (!begininsert_called)
     480           0 :         elog(ERROR, "XLogBeginInsert was not called");
     481             : 
     482             :     /*
     483             :      * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
     484             :      * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
     485             :      */
     486    30508264 :     if ((info & ~(XLR_RMGR_INFO_MASK |
     487             :                   XLR_SPECIAL_REL_UPDATE |
     488             :                   XLR_CHECK_CONSISTENCY)) != 0)
     489           0 :         elog(PANIC, "invalid xlog info mask %02X", info);
     490             : 
     491             :     TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
     492             : 
     493             :     /*
     494             :      * In bootstrap mode, we don't actually log anything but XLOG resources;
     495             :      * return a phony record pointer.
     496             :      */
     497    30508264 :     if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
     498             :     {
     499     1277652 :         XLogResetInsertion();
     500     1277652 :         EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
     501     1277652 :         return EndPos;
     502             :     }
     503             : 
     504             :     do
     505             :     {
     506             :         XLogRecPtr  RedoRecPtr;
     507             :         bool        doPageWrites;
     508    29245424 :         bool        topxid_included = false;
     509             :         XLogRecPtr  fpw_lsn;
     510             :         XLogRecData *rdt;
     511    29245424 :         int         num_fpi = 0;
     512             : 
     513             :         /*
     514             :          * Get values needed to decide whether to do full-page writes. Since
     515             :          * we don't yet have an insertion lock, these could change under us,
     516             :          * but XLogInsertRecord will recheck them once it has a lock.
     517             :          */
     518    29245424 :         GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
     519             : 
     520    29245424 :         rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
     521             :                                  &fpw_lsn, &num_fpi, &topxid_included);
     522             : 
     523    29245424 :         EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
     524             :                                   topxid_included);
     525    29245424 :     } while (EndPos == InvalidXLogRecPtr);
     526             : 
     527    29230612 :     XLogResetInsertion();
     528             : 
     529    29230612 :     return EndPos;
     530             : }
     531             : 
     532             : /*
     533             :  * Simple wrapper to XLogInsert to insert a WAL record with elementary
     534             :  * contents (only an int64 is supported as value currently).
     535             :  */
     536             : XLogRecPtr
     537      863452 : XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
     538             : {
     539      863452 :     XLogBeginInsert();
     540      863452 :     XLogRegisterData(&value, sizeof(value));
     541      863452 :     return XLogInsert(rmid, info);
     542             : }
     543             : 
     544             : /*
     545             :  * Assemble a WAL record from the registered data and buffers into an
     546             :  * XLogRecData chain, ready for insertion with XLogInsertRecord().
     547             :  *
     548             :  * The record header fields are filled in, except for the xl_prev field. The
     549             :  * calculated CRC does not include the record header yet.
     550             :  *
     551             :  * If there are any registered buffers, and a full-page image was not taken
     552             :  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
     553             :  * signals that the assembled record is only good for insertion on the
     554             :  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
     555             :  *
     556             :  * *topxid_included is set if the topmost transaction ID is logged with the
     557             :  * current subtransaction.
     558             :  */
     559             : static XLogRecData *
     560    29245424 : XLogRecordAssemble(RmgrId rmid, uint8 info,
     561             :                    XLogRecPtr RedoRecPtr, bool doPageWrites,
     562             :                    XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
     563             : {
     564             :     XLogRecData *rdt;
     565    29245424 :     uint64      total_len = 0;
     566             :     int         block_id;
     567             :     pg_crc32c   rdata_crc;
     568    29245424 :     registered_buffer *prev_regbuf = NULL;
     569             :     XLogRecData *rdt_datas_last;
     570             :     XLogRecord *rechdr;
     571    29245424 :     char       *scratch = hdr_scratch;
     572             : 
     573             :     /*
     574             :      * Note: this function can be called multiple times for the same record.
     575             :      * All the modifications we do to the rdata chains below must handle that.
     576             :      */
     577             : 
     578             :     /* The record begins with the fixed-size header */
     579    29245424 :     rechdr = (XLogRecord *) scratch;
     580    29245424 :     scratch += SizeOfXLogRecord;
     581             : 
     582    29245424 :     hdr_rdt.next = NULL;
     583    29245424 :     rdt_datas_last = &hdr_rdt;
     584    29245424 :     hdr_rdt.data = hdr_scratch;
     585             : 
     586             :     /*
     587             :      * Enforce consistency checks for this record if user is looking for it.
     588             :      * Do this before at the beginning of this routine to give the possibility
     589             :      * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
     590             :      * a record.
     591             :      */
     592    29245424 :     if (wal_consistency_checking[rmid])
     593     4326412 :         info |= XLR_CHECK_CONSISTENCY;
     594             : 
     595             :     /*
     596             :      * Make an rdata chain containing all the data portions of all block
     597             :      * references. This includes the data for full-page images. Also append
     598             :      * the headers for the block references in the scratch buffer.
     599             :      */
     600    29245424 :     *fpw_lsn = InvalidXLogRecPtr;
     601    58298670 :     for (block_id = 0; block_id < max_registered_block_id; block_id++)
     602             :     {
     603    29053246 :         registered_buffer *regbuf = &registered_buffers[block_id];
     604             :         bool        needs_backup;
     605             :         bool        needs_data;
     606             :         XLogRecordBlockHeader bkpb;
     607             :         XLogRecordBlockImageHeader bimg;
     608    29053246 :         XLogRecordBlockCompressHeader cbimg = {0};
     609             :         bool        samerel;
     610    29053246 :         bool        is_compressed = false;
     611             :         bool        include_image;
     612             : 
     613    29053246 :         if (!regbuf->in_use)
     614       22138 :             continue;
     615             : 
     616             :         /* Determine if this block needs to be backed up */
     617    29031108 :         if (regbuf->flags & REGBUF_FORCE_IMAGE)
     618      607204 :             needs_backup = true;
     619    28423904 :         else if (regbuf->flags & REGBUF_NO_IMAGE)
     620      427542 :             needs_backup = false;
     621    27996362 :         else if (!doPageWrites)
     622      537714 :             needs_backup = false;
     623             :         else
     624             :         {
     625             :             /*
     626             :              * We assume page LSN is first data on *every* page that can be
     627             :              * passed to XLogInsert, whether it has the standard page layout
     628             :              * or not.
     629             :              */
     630    27458648 :             XLogRecPtr  page_lsn = PageGetLSN(regbuf->page);
     631             : 
     632    27458648 :             needs_backup = (page_lsn <= RedoRecPtr);
     633    27458648 :             if (!needs_backup)
     634             :             {
     635    27267086 :                 if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
     636    26425056 :                     *fpw_lsn = page_lsn;
     637             :             }
     638             :         }
     639             : 
     640             :         /* Determine if the buffer data needs to included */
     641    29031108 :         if (regbuf->rdata_len == 0)
     642     5488848 :             needs_data = false;
     643    23542260 :         else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
     644      594628 :             needs_data = true;
     645             :         else
     646    22947632 :             needs_data = !needs_backup;
     647             : 
     648    29031108 :         bkpb.id = block_id;
     649    29031108 :         bkpb.fork_flags = regbuf->forkno;
     650    29031108 :         bkpb.data_length = 0;
     651             : 
     652    29031108 :         if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
     653      420518 :             bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
     654             : 
     655             :         /*
     656             :          * If needs_backup is true or WAL checking is enabled for current
     657             :          * resource manager, log a full-page write for the current block.
     658             :          */
     659    29031108 :         include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
     660             : 
     661    29031108 :         if (include_image)
     662             :         {
     663     5412002 :             const PageData *page = regbuf->page;
     664     5412002 :             uint16      compressed_len = 0;
     665             : 
     666             :             /*
     667             :              * The page needs to be backed up, so calculate its hole length
     668             :              * and offset.
     669             :              */
     670     5412002 :             if (regbuf->flags & REGBUF_STANDARD)
     671             :             {
     672             :                 /* Assume we can omit data between pd_lower and pd_upper */
     673     5102848 :                 uint16      lower = ((PageHeader) page)->pd_lower;
     674     5102848 :                 uint16      upper = ((PageHeader) page)->pd_upper;
     675             : 
     676     5102848 :                 if (lower >= SizeOfPageHeaderData &&
     677     5099442 :                     upper > lower &&
     678             :                     upper <= BLCKSZ)
     679             :                 {
     680     5099442 :                     bimg.hole_offset = lower;
     681     5099442 :                     cbimg.hole_length = upper - lower;
     682             :                 }
     683             :                 else
     684             :                 {
     685             :                     /* No "hole" to remove */
     686        3406 :                     bimg.hole_offset = 0;
     687        3406 :                     cbimg.hole_length = 0;
     688             :                 }
     689             :             }
     690             :             else
     691             :             {
     692             :                 /* Not a standard page header, don't try to eliminate "hole" */
     693      309154 :                 bimg.hole_offset = 0;
     694      309154 :                 cbimg.hole_length = 0;
     695             :             }
     696             : 
     697             :             /*
     698             :              * Try to compress a block image if wal_compression is enabled
     699             :              */
     700     5412002 :             if (wal_compression != WAL_COMPRESSION_NONE)
     701             :             {
     702             :                 is_compressed =
     703           0 :                     XLogCompressBackupBlock(page, bimg.hole_offset,
     704           0 :                                             cbimg.hole_length,
     705           0 :                                             regbuf->compressed_page,
     706             :                                             &compressed_len);
     707             :             }
     708             : 
     709             :             /*
     710             :              * Fill in the remaining fields in the XLogRecordBlockHeader
     711             :              * struct
     712             :              */
     713     5412002 :             bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
     714             : 
     715             :             /* Report a full page image constructed for the WAL record */
     716     5412002 :             *num_fpi += 1;
     717             : 
     718             :             /*
     719             :              * Construct XLogRecData entries for the page content.
     720             :              */
     721     5412002 :             rdt_datas_last->next = &regbuf->bkp_rdatas[0];
     722     5412002 :             rdt_datas_last = rdt_datas_last->next;
     723             : 
     724     5412002 :             bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
     725             : 
     726             :             /*
     727             :              * If WAL consistency checking is enabled for the resource manager
     728             :              * of this WAL record, a full-page image is included in the record
     729             :              * for the block modified. During redo, the full-page is replayed
     730             :              * only if BKPIMAGE_APPLY is set.
     731             :              */
     732     5412002 :             if (needs_backup)
     733      798766 :                 bimg.bimg_info |= BKPIMAGE_APPLY;
     734             : 
     735     5412002 :             if (is_compressed)
     736             :             {
     737             :                 /* The current compression is stored in the WAL record */
     738           0 :                 bimg.length = compressed_len;
     739             : 
     740             :                 /* Set the compression method used for this block */
     741           0 :                 switch ((WalCompression) wal_compression)
     742             :                 {
     743           0 :                     case WAL_COMPRESSION_PGLZ:
     744           0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
     745           0 :                         break;
     746             : 
     747           0 :                     case WAL_COMPRESSION_LZ4:
     748             : #ifdef USE_LZ4
     749           0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
     750             : #else
     751             :                         elog(ERROR, "LZ4 is not supported by this build");
     752             : #endif
     753           0 :                         break;
     754             : 
     755           0 :                     case WAL_COMPRESSION_ZSTD:
     756             : #ifdef USE_ZSTD
     757             :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
     758             : #else
     759           0 :                         elog(ERROR, "zstd is not supported by this build");
     760             : #endif
     761             :                         break;
     762             : 
     763           0 :                     case WAL_COMPRESSION_NONE:
     764             :                         Assert(false);  /* cannot happen */
     765           0 :                         break;
     766             :                         /* no default case, so that compiler will warn */
     767             :                 }
     768             : 
     769           0 :                 rdt_datas_last->data = regbuf->compressed_page;
     770           0 :                 rdt_datas_last->len = compressed_len;
     771             :             }
     772             :             else
     773             :             {
     774     5412002 :                 bimg.length = BLCKSZ - cbimg.hole_length;
     775             : 
     776     5412002 :                 if (cbimg.hole_length == 0)
     777             :                 {
     778      312560 :                     rdt_datas_last->data = page;
     779      312560 :                     rdt_datas_last->len = BLCKSZ;
     780             :                 }
     781             :                 else
     782             :                 {
     783             :                     /* must skip the hole */
     784     5099442 :                     rdt_datas_last->data = page;
     785     5099442 :                     rdt_datas_last->len = bimg.hole_offset;
     786             : 
     787     5099442 :                     rdt_datas_last->next = &regbuf->bkp_rdatas[1];
     788     5099442 :                     rdt_datas_last = rdt_datas_last->next;
     789             : 
     790     5099442 :                     rdt_datas_last->data =
     791     5099442 :                         page + (bimg.hole_offset + cbimg.hole_length);
     792     5099442 :                     rdt_datas_last->len =
     793     5099442 :                         BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
     794             :                 }
     795             :             }
     796             : 
     797     5412002 :             total_len += bimg.length;
     798             :         }
     799             : 
     800    29031108 :         if (needs_data)
     801             :         {
     802             :             /*
     803             :              * When copying to XLogRecordBlockHeader, the length is narrowed
     804             :              * to an uint16.  Double-check that it is still correct.
     805             :              */
     806             :             Assert(regbuf->rdata_len <= UINT16_MAX);
     807             : 
     808             :             /*
     809             :              * Link the caller-supplied rdata chain for this buffer to the
     810             :              * overall list.
     811             :              */
     812    23460672 :             bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
     813    23460672 :             bkpb.data_length = (uint16) regbuf->rdata_len;
     814    23460672 :             total_len += regbuf->rdata_len;
     815             : 
     816    23460672 :             rdt_datas_last->next = regbuf->rdata_head;
     817    23460672 :             rdt_datas_last = regbuf->rdata_tail;
     818             :         }
     819             : 
     820    29031108 :         if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
     821             :         {
     822     1426602 :             samerel = true;
     823     1426602 :             bkpb.fork_flags |= BKPBLOCK_SAME_REL;
     824             :         }
     825             :         else
     826    27604506 :             samerel = false;
     827    29031108 :         prev_regbuf = regbuf;
     828             : 
     829             :         /* Ok, copy the header to the scratch buffer */
     830    29031108 :         memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
     831    29031108 :         scratch += SizeOfXLogRecordBlockHeader;
     832    29031108 :         if (include_image)
     833             :         {
     834     5412002 :             memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
     835     5412002 :             scratch += SizeOfXLogRecordBlockImageHeader;
     836     5412002 :             if (cbimg.hole_length != 0 && is_compressed)
     837             :             {
     838           0 :                 memcpy(scratch, &cbimg,
     839             :                        SizeOfXLogRecordBlockCompressHeader);
     840           0 :                 scratch += SizeOfXLogRecordBlockCompressHeader;
     841             :             }
     842             :         }
     843    29031108 :         if (!samerel)
     844             :         {
     845    27604506 :             memcpy(scratch, &regbuf->rlocator, sizeof(RelFileLocator));
     846    27604506 :             scratch += sizeof(RelFileLocator);
     847             :         }
     848    29031108 :         memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
     849    29031108 :         scratch += sizeof(BlockNumber);
     850             :     }
     851             : 
     852             :     /* followed by the record's origin, if any */
     853    29245424 :     if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
     854    17406284 :         replorigin_session_origin != InvalidRepOriginId)
     855             :     {
     856      300856 :         *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
     857      300856 :         memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
     858      300856 :         scratch += sizeof(replorigin_session_origin);
     859             :     }
     860             : 
     861             :     /* followed by toplevel XID, if not already included in previous record */
     862    29245424 :     if (IsSubxactTopXidLogPending())
     863             :     {
     864         442 :         TransactionId xid = GetTopTransactionIdIfAny();
     865             : 
     866             :         /* Set the flag that the top xid is included in the WAL */
     867         442 :         *topxid_included = true;
     868             : 
     869         442 :         *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
     870         442 :         memcpy(scratch, &xid, sizeof(TransactionId));
     871         442 :         scratch += sizeof(TransactionId);
     872             :     }
     873             : 
     874             :     /* followed by main data, if any */
     875    29245424 :     if (mainrdata_len > 0)
     876             :     {
     877    28601188 :         if (mainrdata_len > 255)
     878             :         {
     879             :             uint32      mainrdata_len_4b;
     880             : 
     881       61622 :             if (mainrdata_len > PG_UINT32_MAX)
     882           0 :                 ereport(ERROR,
     883             :                         (errmsg_internal("too much WAL data"),
     884             :                          errdetail_internal("Main data length is %" PRIu64 " bytes for a maximum of %u bytes.",
     885             :                                             mainrdata_len,
     886             :                                             PG_UINT32_MAX)));
     887             : 
     888       61622 :             mainrdata_len_4b = (uint32) mainrdata_len;
     889       61622 :             *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
     890       61622 :             memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
     891       61622 :             scratch += sizeof(uint32);
     892             :         }
     893             :         else
     894             :         {
     895    28539566 :             *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
     896    28539566 :             *(scratch++) = (uint8) mainrdata_len;
     897             :         }
     898    28601188 :         rdt_datas_last->next = mainrdata_head;
     899    28601188 :         rdt_datas_last = mainrdata_last;
     900    28601188 :         total_len += mainrdata_len;
     901             :     }
     902    29245424 :     rdt_datas_last->next = NULL;
     903             : 
     904    29245424 :     hdr_rdt.len = (scratch - hdr_scratch);
     905    29245424 :     total_len += hdr_rdt.len;
     906             : 
     907             :     /*
     908             :      * Calculate CRC of the data
     909             :      *
     910             :      * Note that the record header isn't added into the CRC initially since we
     911             :      * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
     912             :      * the whole record in the order: rdata, then backup blocks, then record
     913             :      * header.
     914             :      */
     915    29245424 :     INIT_CRC32C(rdata_crc);
     916    29245424 :     COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
     917   108870348 :     for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
     918    79624924 :         COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
     919             : 
     920             :     /*
     921             :      * Ensure that the XLogRecord is not too large.
     922             :      *
     923             :      * XLogReader machinery is only able to handle records up to a certain
     924             :      * size (ignoring machine resource limitations), so make sure that we will
     925             :      * not emit records larger than the sizes advertised to be supported.
     926             :      */
     927    29245424 :     if (total_len > XLogRecordMaxSize)
     928           0 :         ereport(ERROR,
     929             :                 (errmsg_internal("oversized WAL record"),
     930             :                  errdetail_internal("WAL record would be %" PRIu64 " bytes (of maximum %u bytes); rmid %u flags %u.",
     931             :                                     total_len, XLogRecordMaxSize, rmid, info)));
     932             : 
     933             :     /*
     934             :      * Fill in the fields in the record header. Prev-link is filled in later,
     935             :      * once we know where in the WAL the record will be inserted. The CRC does
     936             :      * not include the record header yet.
     937             :      */
     938    29245424 :     rechdr->xl_xid = GetCurrentTransactionIdIfAny();
     939    29245424 :     rechdr->xl_tot_len = (uint32) total_len;
     940    29245424 :     rechdr->xl_info = info;
     941    29245424 :     rechdr->xl_rmid = rmid;
     942    29245424 :     rechdr->xl_prev = InvalidXLogRecPtr;
     943    29245424 :     rechdr->xl_crc = rdata_crc;
     944             : 
     945    29245424 :     return &hdr_rdt;
     946             : }
     947             : 
     948             : /*
     949             :  * Create a compressed version of a backup block image.
     950             :  *
     951             :  * Returns false if compression fails (i.e., compressed result is actually
     952             :  * bigger than original). Otherwise, returns true and sets 'dlen' to
     953             :  * the length of compressed block image.
     954             :  */
     955             : static bool
     956           0 : XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
     957             :                         void *dest, uint16 *dlen)
     958             : {
     959           0 :     int32       orig_len = BLCKSZ - hole_length;
     960           0 :     int32       len = -1;
     961           0 :     int32       extra_bytes = 0;
     962             :     const void *source;
     963             :     PGAlignedBlock tmp;
     964             : 
     965           0 :     if (hole_length != 0)
     966             :     {
     967             :         /* must skip the hole */
     968           0 :         memcpy(tmp.data, page, hole_offset);
     969           0 :         memcpy(tmp.data + hole_offset,
     970           0 :                page + (hole_offset + hole_length),
     971           0 :                BLCKSZ - (hole_length + hole_offset));
     972           0 :         source = tmp.data;
     973             : 
     974             :         /*
     975             :          * Extra data needs to be stored in WAL record for the compressed
     976             :          * version of block image if the hole exists.
     977             :          */
     978           0 :         extra_bytes = SizeOfXLogRecordBlockCompressHeader;
     979             :     }
     980             :     else
     981           0 :         source = page;
     982             : 
     983           0 :     switch ((WalCompression) wal_compression)
     984             :     {
     985           0 :         case WAL_COMPRESSION_PGLZ:
     986           0 :             len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
     987           0 :             break;
     988             : 
     989           0 :         case WAL_COMPRESSION_LZ4:
     990             : #ifdef USE_LZ4
     991           0 :             len = LZ4_compress_default(source, dest, orig_len,
     992             :                                        COMPRESS_BUFSIZE);
     993           0 :             if (len <= 0)
     994           0 :                 len = -1;       /* failure */
     995             : #else
     996             :             elog(ERROR, "LZ4 is not supported by this build");
     997             : #endif
     998           0 :             break;
     999             : 
    1000           0 :         case WAL_COMPRESSION_ZSTD:
    1001             : #ifdef USE_ZSTD
    1002             :             len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
    1003             :                                 ZSTD_CLEVEL_DEFAULT);
    1004             :             if (ZSTD_isError(len))
    1005             :                 len = -1;       /* failure */
    1006             : #else
    1007           0 :             elog(ERROR, "zstd is not supported by this build");
    1008             : #endif
    1009             :             break;
    1010             : 
    1011           0 :         case WAL_COMPRESSION_NONE:
    1012             :             Assert(false);      /* cannot happen */
    1013           0 :             break;
    1014             :             /* no default case, so that compiler will warn */
    1015             :     }
    1016             : 
    1017             :     /*
    1018             :      * We recheck the actual size even if compression reports success and see
    1019             :      * if the number of bytes saved by compression is larger than the length
    1020             :      * of extra data needed for the compressed version of block image.
    1021             :      */
    1022           0 :     if (len >= 0 &&
    1023           0 :         len + extra_bytes < orig_len)
    1024             :     {
    1025           0 :         *dlen = (uint16) len;   /* successful compression */
    1026           0 :         return true;
    1027             :     }
    1028           0 :     return false;
    1029             : }
    1030             : 
    1031             : /*
    1032             :  * Determine whether the buffer referenced has to be backed up.
    1033             :  *
    1034             :  * Since we don't yet have the insert lock, fullPageWrites and runningBackups
    1035             :  * (which forces full-page writes) could change later, so the result should
    1036             :  * be used for optimization purposes only.
    1037             :  */
    1038             : bool
    1039      289520 : XLogCheckBufferNeedsBackup(Buffer buffer)
    1040             : {
    1041             :     XLogRecPtr  RedoRecPtr;
    1042             :     bool        doPageWrites;
    1043             :     Page        page;
    1044             : 
    1045      289520 :     GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
    1046             : 
    1047      289520 :     page = BufferGetPage(buffer);
    1048             : 
    1049      289520 :     if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
    1050        2484 :         return true;            /* buffer requires backup */
    1051             : 
    1052      287036 :     return false;               /* buffer does not need to be backed up */
    1053             : }
    1054             : 
    1055             : /*
    1056             :  * Write a backup block if needed when we are setting a hint. Note that
    1057             :  * this may be called for a variety of page types, not just heaps.
    1058             :  *
    1059             :  * Callable while holding just share lock on the buffer content.
    1060             :  *
    1061             :  * We can't use the plain backup block mechanism since that relies on the
    1062             :  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
    1063             :  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
    1064             :  * failures. So instead we copy the page and insert the copied data as normal
    1065             :  * record data.
    1066             :  *
    1067             :  * We only need to do something if page has not yet been full page written in
    1068             :  * this checkpoint round. The LSN of the inserted wal record is returned if we
    1069             :  * had to write, InvalidXLogRecPtr otherwise.
    1070             :  *
    1071             :  * It is possible that multiple concurrent backends could attempt to write WAL
    1072             :  * records. In that case, multiple copies of the same block would be recorded
    1073             :  * in separate WAL records by different backends, though that is still OK from
    1074             :  * a correctness perspective.
    1075             :  */
    1076             : XLogRecPtr
    1077      119716 : XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
    1078             : {
    1079      119716 :     XLogRecPtr  recptr = InvalidXLogRecPtr;
    1080             :     XLogRecPtr  lsn;
    1081             :     XLogRecPtr  RedoRecPtr;
    1082             : 
    1083             :     /*
    1084             :      * Ensure no checkpoint can change our view of RedoRecPtr.
    1085             :      */
    1086             :     Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0);
    1087             : 
    1088             :     /*
    1089             :      * Update RedoRecPtr so that we can make the right decision
    1090             :      */
    1091      119716 :     RedoRecPtr = GetRedoRecPtr();
    1092             : 
    1093             :     /*
    1094             :      * We assume page LSN is first data on *every* page that can be passed to
    1095             :      * XLogInsert, whether it has the standard page layout or not. Since we're
    1096             :      * only holding a share-lock on the page, we must take the buffer header
    1097             :      * lock when we look at the LSN.
    1098             :      */
    1099      119716 :     lsn = BufferGetLSNAtomic(buffer);
    1100             : 
    1101      119716 :     if (lsn <= RedoRecPtr)
    1102             :     {
    1103       64586 :         int         flags = 0;
    1104             :         PGAlignedBlock copied_buffer;
    1105       64586 :         char       *origdata = (char *) BufferGetBlock(buffer);
    1106             :         RelFileLocator rlocator;
    1107             :         ForkNumber  forkno;
    1108             :         BlockNumber blkno;
    1109             : 
    1110             :         /*
    1111             :          * Copy buffer so we don't have to worry about concurrent hint bit or
    1112             :          * lsn updates. We assume pd_lower/upper cannot be changed without an
    1113             :          * exclusive lock, so the contents bkp are not racy.
    1114             :          */
    1115       64586 :         if (buffer_std)
    1116             :         {
    1117             :             /* Assume we can omit data between pd_lower and pd_upper */
    1118       40952 :             Page        page = BufferGetPage(buffer);
    1119       40952 :             uint16      lower = ((PageHeader) page)->pd_lower;
    1120       40952 :             uint16      upper = ((PageHeader) page)->pd_upper;
    1121             : 
    1122       40952 :             memcpy(copied_buffer.data, origdata, lower);
    1123       40952 :             memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
    1124             :         }
    1125             :         else
    1126       23634 :             memcpy(copied_buffer.data, origdata, BLCKSZ);
    1127             : 
    1128       64586 :         XLogBeginInsert();
    1129             : 
    1130       64586 :         if (buffer_std)
    1131       40952 :             flags |= REGBUF_STANDARD;
    1132             : 
    1133       64586 :         BufferGetTag(buffer, &rlocator, &forkno, &blkno);
    1134       64586 :         XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);
    1135             : 
    1136       64586 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
    1137             :     }
    1138             : 
    1139      119716 :     return recptr;
    1140             : }
    1141             : 
    1142             : /*
    1143             :  * Write a WAL record containing a full image of a page. Caller is responsible
    1144             :  * for writing the page to disk after calling this routine.
    1145             :  *
    1146             :  * Note: If you're using this function, you should be building pages in private
    1147             :  * memory and writing them directly to smgr.  If you're using buffers, call
    1148             :  * log_newpage_buffer instead.
    1149             :  *
    1150             :  * If the page follows the standard page layout, with a PageHeader and unused
    1151             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1152             :  * the unused space to be left out from the WAL record, making it smaller.
    1153             :  */
    1154             : XLogRecPtr
    1155      253850 : log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno,
    1156             :             Page page, bool page_std)
    1157             : {
    1158             :     int         flags;
    1159             :     XLogRecPtr  recptr;
    1160             : 
    1161      253850 :     flags = REGBUF_FORCE_IMAGE;
    1162      253850 :     if (page_std)
    1163      253492 :         flags |= REGBUF_STANDARD;
    1164             : 
    1165      253850 :     XLogBeginInsert();
    1166      253850 :     XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags);
    1167      253850 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1168             : 
    1169             :     /*
    1170             :      * The page may be uninitialized. If so, we can't set the LSN because that
    1171             :      * would corrupt the page.
    1172             :      */
    1173      253850 :     if (!PageIsNew(page))
    1174             :     {
    1175      253842 :         PageSetLSN(page, recptr);
    1176             :     }
    1177             : 
    1178      253850 :     return recptr;
    1179             : }
    1180             : 
    1181             : /*
    1182             :  * Like log_newpage(), but allows logging multiple pages in one operation.
    1183             :  * It is more efficient than calling log_newpage() for each page separately,
    1184             :  * because we can write multiple pages in a single WAL record.
    1185             :  */
    1186             : void
    1187       38544 : log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages,
    1188             :              BlockNumber *blknos, Page *pages, bool page_std)
    1189             : {
    1190             :     int         flags;
    1191             :     XLogRecPtr  recptr;
    1192             :     int         i;
    1193             :     int         j;
    1194             : 
    1195       38544 :     flags = REGBUF_FORCE_IMAGE;
    1196       38544 :     if (page_std)
    1197       38456 :         flags |= REGBUF_STANDARD;
    1198             : 
    1199             :     /*
    1200             :      * Iterate over all the pages. They are collected into batches of
    1201             :      * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
    1202             :      * batch.
    1203             :      */
    1204       38544 :     XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
    1205             : 
    1206       38544 :     i = 0;
    1207       77088 :     while (i < num_pages)
    1208             :     {
    1209       38544 :         int         batch_start = i;
    1210             :         int         nbatch;
    1211             : 
    1212       38544 :         XLogBeginInsert();
    1213             : 
    1214       38544 :         nbatch = 0;
    1215      114848 :         while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
    1216             :         {
    1217       76304 :             XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags);
    1218       76304 :             i++;
    1219       76304 :             nbatch++;
    1220             :         }
    1221             : 
    1222       38544 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1223             : 
    1224      114848 :         for (j = batch_start; j < i; j++)
    1225             :         {
    1226             :             /*
    1227             :              * The page may be uninitialized. If so, we can't set the LSN
    1228             :              * because that would corrupt the page.
    1229             :              */
    1230       76304 :             if (!PageIsNew(pages[j]))
    1231             :             {
    1232       76296 :                 PageSetLSN(pages[j], recptr);
    1233             :             }
    1234             :         }
    1235             :     }
    1236       38544 : }
    1237             : 
    1238             : /*
    1239             :  * Write a WAL record containing a full image of a page.
    1240             :  *
    1241             :  * Caller should initialize the buffer and mark it dirty before calling this
    1242             :  * function.  This function will set the page LSN.
    1243             :  *
    1244             :  * If the page follows the standard page layout, with a PageHeader and unused
    1245             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1246             :  * the unused space to be left out from the WAL record, making it smaller.
    1247             :  */
    1248             : XLogRecPtr
    1249      248464 : log_newpage_buffer(Buffer buffer, bool page_std)
    1250             : {
    1251      248464 :     Page        page = BufferGetPage(buffer);
    1252             :     RelFileLocator rlocator;
    1253             :     ForkNumber  forknum;
    1254             :     BlockNumber blkno;
    1255             : 
    1256             :     /* Shared buffers should be modified in a critical section. */
    1257             :     Assert(CritSectionCount > 0);
    1258             : 
    1259      248464 :     BufferGetTag(buffer, &rlocator, &forknum, &blkno);
    1260             : 
    1261      248464 :     return log_newpage(&rlocator, forknum, blkno, page, page_std);
    1262             : }
    1263             : 
    1264             : /*
    1265             :  * WAL-log a range of blocks in a relation.
    1266             :  *
    1267             :  * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
    1268             :  * written to the WAL. If the range is large, this is done in multiple WAL
    1269             :  * records.
    1270             :  *
    1271             :  * If all page follows the standard page layout, with a PageHeader and unused
    1272             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1273             :  * the unused space to be left out from the WAL records, making them smaller.
    1274             :  *
    1275             :  * NOTE: This function acquires exclusive-locks on the pages. Typically, this
    1276             :  * is used on a newly-built relation, and the caller is holding a
    1277             :  * AccessExclusiveLock on it, so no other backend can be accessing it at the
    1278             :  * same time. If that's not the case, you must ensure that this does not
    1279             :  * cause a deadlock through some other means.
    1280             :  */
    1281             : void
    1282       97800 : log_newpage_range(Relation rel, ForkNumber forknum,
    1283             :                   BlockNumber startblk, BlockNumber endblk,
    1284             :                   bool page_std)
    1285             : {
    1286             :     int         flags;
    1287             :     BlockNumber blkno;
    1288             : 
    1289       97800 :     flags = REGBUF_FORCE_IMAGE;
    1290       97800 :     if (page_std)
    1291         726 :         flags |= REGBUF_STANDARD;
    1292             : 
    1293             :     /*
    1294             :      * Iterate over all the pages in the range. They are collected into
    1295             :      * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
    1296             :      * for each batch.
    1297             :      */
    1298       97800 :     XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
    1299             : 
    1300       97800 :     blkno = startblk;
    1301      170972 :     while (blkno < endblk)
    1302             :     {
    1303             :         Buffer      bufpack[XLR_MAX_BLOCK_ID];
    1304             :         XLogRecPtr  recptr;
    1305             :         int         nbufs;
    1306             :         int         i;
    1307             : 
    1308       73174 :         CHECK_FOR_INTERRUPTS();
    1309             : 
    1310             :         /* Collect a batch of blocks. */
    1311       73174 :         nbufs = 0;
    1312      347848 :         while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
    1313             :         {
    1314      274674 :             Buffer      buf = ReadBufferExtended(rel, forknum, blkno,
    1315             :                                                  RBM_NORMAL, NULL);
    1316             : 
    1317      274674 :             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
    1318             : 
    1319             :             /*
    1320             :              * Completely empty pages are not WAL-logged. Writing a WAL record
    1321             :              * would change the LSN, and we don't want that. We want the page
    1322             :              * to stay empty.
    1323             :              */
    1324      274674 :             if (!PageIsNew(BufferGetPage(buf)))
    1325      271278 :                 bufpack[nbufs++] = buf;
    1326             :             else
    1327        3396 :                 UnlockReleaseBuffer(buf);
    1328      274674 :             blkno++;
    1329             :         }
    1330             : 
    1331             :         /* Nothing more to do if all remaining blocks were empty. */
    1332       73174 :         if (nbufs == 0)
    1333           2 :             break;
    1334             : 
    1335             :         /* Write WAL record for this batch. */
    1336       73172 :         XLogBeginInsert();
    1337             : 
    1338       73172 :         START_CRIT_SECTION();
    1339      344450 :         for (i = 0; i < nbufs; i++)
    1340             :         {
    1341      271278 :             MarkBufferDirty(bufpack[i]);
    1342      271278 :             XLogRegisterBuffer(i, bufpack[i], flags);
    1343             :         }
    1344             : 
    1345       73172 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1346             : 
    1347      344450 :         for (i = 0; i < nbufs; i++)
    1348             :         {
    1349      271278 :             PageSetLSN(BufferGetPage(bufpack[i]), recptr);
    1350      271278 :             UnlockReleaseBuffer(bufpack[i]);
    1351             :         }
    1352       73172 :         END_CRIT_SECTION();
    1353             :     }
    1354       97800 : }
    1355             : 
    1356             : /*
    1357             :  * Allocate working buffers needed for WAL record construction.
    1358             :  */
    1359             : void
    1360       42058 : InitXLogInsert(void)
    1361             : {
    1362             : #ifdef USE_ASSERT_CHECKING
    1363             : 
    1364             :     /*
    1365             :      * Check that any records assembled can be decoded.  This is capped based
    1366             :      * on what XLogReader would require at its maximum bound.  The XLOG_BLCKSZ
    1367             :      * addend covers the larger allocate_recordbuf() demand.  This code path
    1368             :      * is called once per backend, more than enough for this check.
    1369             :      */
    1370             :     size_t      max_required =
    1371             :         DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);
    1372             : 
    1373             :     Assert(AllocSizeIsValid(max_required));
    1374             : #endif
    1375             : 
    1376             :     /* Initialize the working areas */
    1377       42058 :     if (xloginsert_cxt == NULL)
    1378             :     {
    1379       42058 :         xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
    1380             :                                                "WAL record construction",
    1381             :                                                ALLOCSET_DEFAULT_SIZES);
    1382             :     }
    1383             : 
    1384       42058 :     if (registered_buffers == NULL)
    1385             :     {
    1386       42058 :         registered_buffers = (registered_buffer *)
    1387       42058 :             MemoryContextAllocZero(xloginsert_cxt,
    1388             :                                    sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
    1389       42058 :         max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
    1390             :     }
    1391       42058 :     if (rdatas == NULL)
    1392             :     {
    1393       42058 :         rdatas = MemoryContextAlloc(xloginsert_cxt,
    1394             :                                     sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
    1395       42058 :         max_rdatas = XLR_NORMAL_RDATAS;
    1396             :     }
    1397             : 
    1398             :     /*
    1399             :      * Allocate a buffer to hold the header information for a WAL record.
    1400             :      */
    1401       42058 :     if (hdr_scratch == NULL)
    1402       42058 :         hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
    1403             :                                              HEADER_SCRATCH_SIZE);
    1404       42058 : }

Generated by: LCOV version 1.16