LCOV - code coverage report
Current view: top level - src/backend/access/transam - xloginsert.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 333 396 84.1 %
Date: 2024-12-27 11:15:27 Functions: 17 18 94.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * xloginsert.c
       4             :  *      Functions for constructing WAL records
       5             :  *
       6             :  * Constructing a WAL record begins with a call to XLogBeginInsert,
       7             :  * followed by a number of XLogRegister* calls. The registered data is
       8             :  * collected in private working memory, and finally assembled into a chain
       9             :  * of XLogRecData structs by a call to XLogRecordAssemble(). See
      10             :  * access/transam/README for details.
      11             :  *
      12             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      13             :  * Portions Copyright (c) 1994, Regents of the University of California
      14             :  *
      15             :  * src/backend/access/transam/xloginsert.c
      16             :  *
      17             :  *-------------------------------------------------------------------------
      18             :  */
      19             : 
      20             : #include "postgres.h"
      21             : 
      22             : #ifdef USE_LZ4
      23             : #include <lz4.h>
      24             : #endif
      25             : 
      26             : #ifdef USE_ZSTD
      27             : #include <zstd.h>
      28             : #endif
      29             : 
      30             : #include "access/xact.h"
      31             : #include "access/xlog.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xloginsert.h"
      34             : #include "catalog/pg_control.h"
      35             : #include "common/pg_lzcompress.h"
      36             : #include "miscadmin.h"
      37             : #include "pg_trace.h"
      38             : #include "replication/origin.h"
      39             : #include "storage/bufmgr.h"
      40             : #include "storage/proc.h"
      41             : #include "utils/memutils.h"
      42             : 
      43             : /*
      44             :  * Guess the maximum buffer size required to store a compressed version of
      45             :  * backup block image.
      46             :  */
      47             : #ifdef USE_LZ4
      48             : #define LZ4_MAX_BLCKSZ      LZ4_COMPRESSBOUND(BLCKSZ)
      49             : #else
      50             : #define LZ4_MAX_BLCKSZ      0
      51             : #endif
      52             : 
      53             : #ifdef USE_ZSTD
      54             : #define ZSTD_MAX_BLCKSZ     ZSTD_COMPRESSBOUND(BLCKSZ)
      55             : #else
      56             : #define ZSTD_MAX_BLCKSZ     0
      57             : #endif
      58             : 
      59             : #define PGLZ_MAX_BLCKSZ     PGLZ_MAX_OUTPUT(BLCKSZ)
      60             : 
      61             : /* Buffer size required to store a compressed version of backup block image */
      62             : #define COMPRESS_BUFSIZE    Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
      63             : 
      64             : /*
      65             :  * For each block reference registered with XLogRegisterBuffer, we fill in
      66             :  * a registered_buffer struct.
      67             :  */
      68             : typedef struct
      69             : {
      70             :     bool        in_use;         /* is this slot in use? */
      71             :     uint8       flags;          /* REGBUF_* flags */
      72             :     RelFileLocator rlocator;    /* identifies the relation and block */
      73             :     ForkNumber  forkno;
      74             :     BlockNumber block;
      75             :     const char *page;           /* page content */
      76             :     uint32      rdata_len;      /* total length of data in rdata chain */
      77             :     XLogRecData *rdata_head;    /* head of the chain of data registered with
      78             :                                  * this block */
      79             :     XLogRecData *rdata_tail;    /* last entry in the chain, or &rdata_head if
      80             :                                  * empty */
      81             : 
      82             :     XLogRecData bkp_rdatas[2];  /* temporary rdatas used to hold references to
      83             :                                  * backup block data in XLogRecordAssemble() */
      84             : 
      85             :     /* buffer to store a compressed version of backup block image */
      86             :     char        compressed_page[COMPRESS_BUFSIZE];
      87             : } registered_buffer;
      88             : 
      89             : static registered_buffer *registered_buffers;
      90             : static int  max_registered_buffers; /* allocated size */
      91             : static int  max_registered_block_id = 0;    /* highest block_id + 1 currently
      92             :                                              * registered */
      93             : 
      94             : /*
      95             :  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
      96             :  * with XLogRegisterData(...).
      97             :  */
      98             : static XLogRecData *mainrdata_head;
      99             : static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
     100             : static uint64 mainrdata_len;    /* total # of bytes in chain */
     101             : 
     102             : /* flags for the in-progress insertion */
     103             : static uint8 curinsert_flags = 0;
     104             : 
     105             : /*
     106             :  * These are used to hold the record header while constructing a record.
     107             :  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
     108             :  * because we want it to be MAXALIGNed and padding bytes zeroed.
     109             :  *
     110             :  * For simplicity, it's allocated large enough to hold the headers for any
     111             :  * WAL record.
     112             :  */
     113             : static XLogRecData hdr_rdt;
     114             : static char *hdr_scratch = NULL;
     115             : 
     116             : #define SizeOfXlogOrigin    (sizeof(RepOriginId) + sizeof(char))
     117             : #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
     118             : 
     119             : #define HEADER_SCRATCH_SIZE \
     120             :     (SizeOfXLogRecord + \
     121             :      MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
     122             :      SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
     123             :      SizeOfXLogTransactionId)
     124             : 
     125             : /*
     126             :  * An array of XLogRecData structs, to hold registered data.
     127             :  */
     128             : static XLogRecData *rdatas;
     129             : static int  num_rdatas;         /* entries currently used */
     130             : static int  max_rdatas;         /* allocated size */
     131             : 
     132             : static bool begininsert_called = false;
     133             : 
     134             : /* Memory context to hold the registered buffer and data references. */
     135             : static MemoryContext xloginsert_cxt;
     136             : 
     137             : static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
     138             :                                        XLogRecPtr RedoRecPtr, bool doPageWrites,
     139             :                                        XLogRecPtr *fpw_lsn, int *num_fpi,
     140             :                                        bool *topxid_included);
     141             : static bool XLogCompressBackupBlock(const char *page, uint16 hole_offset,
     142             :                                     uint16 hole_length, char *dest, uint16 *dlen);
     143             : 
     144             : /*
     145             :  * Begin constructing a WAL record. This must be called before the
     146             :  * XLogRegister* functions and XLogInsert().
     147             :  */
     148             : void
     149    28817990 : XLogBeginInsert(void)
     150             : {
     151             :     Assert(max_registered_block_id == 0);
     152             :     Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
     153             :     Assert(mainrdata_len == 0);
     154             : 
     155             :     /* cross-check on whether we should be here or not */
     156    28817990 :     if (!XLogInsertAllowed())
     157           0 :         elog(ERROR, "cannot make new WAL entries during recovery");
     158             : 
     159    28817990 :     if (begininsert_called)
     160           0 :         elog(ERROR, "XLogBeginInsert was already called");
     161             : 
     162    28817990 :     begininsert_called = true;
     163    28817990 : }
     164             : 
     165             : /*
     166             :  * Ensure that there are enough buffer and data slots in the working area,
     167             :  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
     168             :  * calls.
     169             :  *
     170             :  * There is always space for a small number of buffers and data chunks, enough
     171             :  * for most record types. This function is for the exceptional cases that need
     172             :  * more.
     173             :  */
     174             : void
     175      119468 : XLogEnsureRecordSpace(int max_block_id, int ndatas)
     176             : {
     177             :     int         nbuffers;
     178             : 
     179             :     /*
     180             :      * This must be called before entering a critical section, because
     181             :      * allocating memory inside a critical section can fail. repalloc() will
     182             :      * check the same, but better to check it here too so that we fail
     183             :      * consistently even if the arrays happen to be large enough already.
     184             :      */
     185             :     Assert(CritSectionCount == 0);
     186             : 
     187             :     /* the minimum values can't be decreased */
     188      119468 :     if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
     189        4198 :         max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
     190      119468 :     if (ndatas < XLR_NORMAL_RDATAS)
     191      119420 :         ndatas = XLR_NORMAL_RDATAS;
     192             : 
     193      119468 :     if (max_block_id > XLR_MAX_BLOCK_ID)
     194           0 :         elog(ERROR, "maximum number of WAL record block references exceeded");
     195      119468 :     nbuffers = max_block_id + 1;
     196             : 
     197      119468 :     if (nbuffers > max_registered_buffers)
     198             :     {
     199        3150 :         registered_buffers = (registered_buffer *)
     200        3150 :             repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
     201             : 
     202             :         /*
     203             :          * At least the padding bytes in the structs must be zeroed, because
     204             :          * they are included in WAL data, but initialize it all for tidiness.
     205             :          */
     206        3150 :         MemSet(&registered_buffers[max_registered_buffers], 0,
     207             :                (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
     208        3150 :         max_registered_buffers = nbuffers;
     209             :     }
     210             : 
     211      119468 :     if (ndatas > max_rdatas)
     212             :     {
     213          30 :         rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
     214          30 :         max_rdatas = ndatas;
     215             :     }
     216      119468 : }
     217             : 
     218             : /*
     219             :  * Reset WAL record construction buffers.
     220             :  */
     221             : void
     222    28874166 : XLogResetInsertion(void)
     223             : {
     224             :     int         i;
     225             : 
     226    57441838 :     for (i = 0; i < max_registered_block_id; i++)
     227    28567672 :         registered_buffers[i].in_use = false;
     228             : 
     229    28874166 :     num_rdatas = 0;
     230    28874166 :     max_registered_block_id = 0;
     231    28874166 :     mainrdata_len = 0;
     232    28874166 :     mainrdata_last = (XLogRecData *) &mainrdata_head;
     233    28874166 :     curinsert_flags = 0;
     234    28874166 :     begininsert_called = false;
     235    28874166 : }
     236             : 
     237             : /*
     238             :  * Register a reference to a buffer with the WAL record being constructed.
     239             :  * This must be called for every page that the WAL-logged operation modifies.
     240             :  */
     241             : void
     242    28030124 : XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
     243             : {
     244             :     registered_buffer *regbuf;
     245             : 
     246             :     /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
     247             :     Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
     248             :     Assert(begininsert_called);
     249             : 
     250             :     /*
     251             :      * Ordinarily, buffer should be exclusive-locked and marked dirty before
     252             :      * we get here, otherwise we could end up violating one of the rules in
     253             :      * access/transam/README.
     254             :      *
     255             :      * Some callers intentionally register a clean page and never update that
     256             :      * page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
     257             :      * bypass these checks.
     258             :      */
     259             : #ifdef USE_ASSERT_CHECKING
     260             :     if (!(flags & REGBUF_NO_CHANGE))
     261             :         Assert(BufferIsExclusiveLocked(buffer) && BufferIsDirty(buffer));
     262             : #endif
     263             : 
     264    28030124 :     if (block_id >= max_registered_block_id)
     265             :     {
     266    27305998 :         if (block_id >= max_registered_buffers)
     267           0 :             elog(ERROR, "too many registered buffers");
     268    27305998 :         max_registered_block_id = block_id + 1;
     269             :     }
     270             : 
     271    28030124 :     regbuf = &registered_buffers[block_id];
     272             : 
     273    28030124 :     BufferGetTag(buffer, &regbuf->rlocator, &regbuf->forkno, &regbuf->block);
     274    28030124 :     regbuf->page = BufferGetPage(buffer);
     275    28030124 :     regbuf->flags = flags;
     276    28030124 :     regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
     277    28030124 :     regbuf->rdata_len = 0;
     278             : 
     279             :     /*
     280             :      * Check that this page hasn't already been registered with some other
     281             :      * block_id.
     282             :      */
     283             : #ifdef USE_ASSERT_CHECKING
     284             :     {
     285             :         int         i;
     286             : 
     287             :         for (i = 0; i < max_registered_block_id; i++)
     288             :         {
     289             :             registered_buffer *regbuf_old = &registered_buffers[i];
     290             : 
     291             :             if (i == block_id || !regbuf_old->in_use)
     292             :                 continue;
     293             : 
     294             :             Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
     295             :                    regbuf_old->forkno != regbuf->forkno ||
     296             :                    regbuf_old->block != regbuf->block);
     297             :         }
     298             :     }
     299             : #endif
     300             : 
     301    28030124 :     regbuf->in_use = true;
     302    28030124 : }
     303             : 
     304             : /*
     305             :  * Like XLogRegisterBuffer, but for registering a block that's not in the
     306             :  * shared buffer pool (i.e. when you don't have a Buffer for it).
     307             :  */
     308             : void
     309      516200 : XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
     310             :                   BlockNumber blknum, const char *page, uint8 flags)
     311             : {
     312             :     registered_buffer *regbuf;
     313             : 
     314             :     Assert(begininsert_called);
     315             : 
     316      516200 :     if (block_id >= max_registered_block_id)
     317      516200 :         max_registered_block_id = block_id + 1;
     318             : 
     319      516200 :     if (block_id >= max_registered_buffers)
     320           0 :         elog(ERROR, "too many registered buffers");
     321             : 
     322      516200 :     regbuf = &registered_buffers[block_id];
     323             : 
     324      516200 :     regbuf->rlocator = *rlocator;
     325      516200 :     regbuf->forkno = forknum;
     326      516200 :     regbuf->block = blknum;
     327      516200 :     regbuf->page = page;
     328      516200 :     regbuf->flags = flags;
     329      516200 :     regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
     330      516200 :     regbuf->rdata_len = 0;
     331             : 
     332             :     /*
     333             :      * Check that this page hasn't already been registered with some other
     334             :      * block_id.
     335             :      */
     336             : #ifdef USE_ASSERT_CHECKING
     337             :     {
     338             :         int         i;
     339             : 
     340             :         for (i = 0; i < max_registered_block_id; i++)
     341             :         {
     342             :             registered_buffer *regbuf_old = &registered_buffers[i];
     343             : 
     344             :             if (i == block_id || !regbuf_old->in_use)
     345             :                 continue;
     346             : 
     347             :             Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) ||
     348             :                    regbuf_old->forkno != regbuf->forkno ||
     349             :                    regbuf_old->block != regbuf->block);
     350             :         }
     351             :     }
     352             : #endif
     353             : 
     354      516200 :     regbuf->in_use = true;
     355      516200 : }
     356             : 
     357             : /*
     358             :  * Add data to the WAL record that's being constructed.
     359             :  *
     360             :  * The data is appended to the "main chunk", available at replay with
     361             :  * XLogRecGetData().
     362             :  */
     363             : void
     364    29731414 : XLogRegisterData(const char *data, uint32 len)
     365             : {
     366             :     XLogRecData *rdata;
     367             : 
     368             :     Assert(begininsert_called);
     369             : 
     370    29731414 :     if (num_rdatas >= max_rdatas)
     371           0 :         ereport(ERROR,
     372             :                 (errmsg_internal("too much WAL data"),
     373             :                  errdetail_internal("%d out of %d data segments are already in use.",
     374             :                                     num_rdatas, max_rdatas)));
     375    29731414 :     rdata = &rdatas[num_rdatas++];
     376             : 
     377    29731414 :     rdata->data = data;
     378    29731414 :     rdata->len = len;
     379             : 
     380             :     /*
     381             :      * we use the mainrdata_last pointer to track the end of the chain, so no
     382             :      * need to clear 'next' here.
     383             :      */
     384             : 
     385    29731414 :     mainrdata_last->next = rdata;
     386    29731414 :     mainrdata_last = rdata;
     387             : 
     388    29731414 :     mainrdata_len += len;
     389    29731414 : }
     390             : 
     391             : /*
     392             :  * Add buffer-specific data to the WAL record that's being constructed.
     393             :  *
     394             :  * Block_id must reference a block previously registered with
     395             :  * XLogRegisterBuffer(). If this is called more than once for the same
     396             :  * block_id, the data is appended.
     397             :  *
     398             :  * The maximum amount of data that can be registered per block is 65535
     399             :  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
     400             :  * reconstruct the changes to the page, you might as well just log a full
     401             :  * copy of it. (the "main data" that's not associated with a block is not
     402             :  * limited)
     403             :  */
     404             : void
     405    38916444 : XLogRegisterBufData(uint8 block_id, const char *data, uint32 len)
     406             : {
     407             :     registered_buffer *regbuf;
     408             :     XLogRecData *rdata;
     409             : 
     410             :     Assert(begininsert_called);
     411             : 
     412             :     /* find the registered buffer struct */
     413    38916444 :     regbuf = &registered_buffers[block_id];
     414    38916444 :     if (!regbuf->in_use)
     415           0 :         elog(ERROR, "no block with id %d registered with WAL insertion",
     416             :              block_id);
     417             : 
     418             :     /*
     419             :      * Check against max_rdatas and ensure we do not register more data per
     420             :      * buffer than can be handled by the physical data format; i.e. that
     421             :      * regbuf->rdata_len does not grow beyond what
     422             :      * XLogRecordBlockHeader->data_length can hold.
     423             :      */
     424    38916444 :     if (num_rdatas >= max_rdatas)
     425           0 :         ereport(ERROR,
     426             :                 (errmsg_internal("too much WAL data"),
     427             :                  errdetail_internal("%d out of %d data segments are already in use.",
     428             :                                     num_rdatas, max_rdatas)));
     429    38916444 :     if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX)
     430           0 :         ereport(ERROR,
     431             :                 (errmsg_internal("too much WAL data"),
     432             :                  errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.",
     433             :                                     UINT16_MAX, block_id, regbuf->rdata_len, len)));
     434             : 
     435    38916444 :     rdata = &rdatas[num_rdatas++];
     436             : 
     437    38916444 :     rdata->data = data;
     438    38916444 :     rdata->len = len;
     439             : 
     440    38916444 :     regbuf->rdata_tail->next = rdata;
     441    38916444 :     regbuf->rdata_tail = rdata;
     442    38916444 :     regbuf->rdata_len += len;
     443    38916444 : }
     444             : 
     445             : /*
     446             :  * Set insert status flags for the upcoming WAL record.
     447             :  *
     448             :  * The flags that can be used here are:
     449             :  * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
     450             :  *   included in the record.
     451             :  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
     452             :  *   durability, which allows to avoid triggering WAL archiving and other
     453             :  *   background activity.
     454             :  */
     455             : void
     456    17956122 : XLogSetRecordFlags(uint8 flags)
     457             : {
     458             :     Assert(begininsert_called);
     459    17956122 :     curinsert_flags |= flags;
     460    17956122 : }
     461             : 
     462             : /*
     463             :  * Insert an XLOG record having the specified RMID and info bytes, with the
     464             :  * body of the record being the data and buffer references registered earlier
     465             :  * with XLogRegister* calls.
     466             :  *
     467             :  * Returns XLOG pointer to end of record (beginning of next record).
     468             :  * This can be used as LSN for data pages affected by the logged action.
     469             :  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
     470             :  * before the data page can be written out.  This implements the basic
     471             :  * WAL rule "write the log before the data".)
     472             :  */
     473             : XLogRecPtr
     474    28817990 : XLogInsert(RmgrId rmid, uint8 info)
     475             : {
     476             :     XLogRecPtr  EndPos;
     477             : 
     478             :     /* XLogBeginInsert() must have been called. */
     479    28817990 :     if (!begininsert_called)
     480           0 :         elog(ERROR, "XLogBeginInsert was not called");
     481             : 
     482             :     /*
     483             :      * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
     484             :      * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
     485             :      */
     486    28817990 :     if ((info & ~(XLR_RMGR_INFO_MASK |
     487             :                   XLR_SPECIAL_REL_UPDATE |
     488             :                   XLR_CHECK_CONSISTENCY)) != 0)
     489           0 :         elog(PANIC, "invalid xlog info mask %02X", info);
     490             : 
     491             :     TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
     492             : 
     493             :     /*
     494             :      * In bootstrap mode, we don't actually log anything but XLOG resources;
     495             :      * return a phony record pointer.
     496             :      */
     497    28817990 :     if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
     498             :     {
     499     1118250 :         XLogResetInsertion();
     500     1118250 :         EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
     501     1118250 :         return EndPos;
     502             :     }
     503             : 
     504             :     do
     505             :     {
     506             :         XLogRecPtr  RedoRecPtr;
     507             :         bool        doPageWrites;
     508    27713500 :         bool        topxid_included = false;
     509             :         XLogRecPtr  fpw_lsn;
     510             :         XLogRecData *rdt;
     511    27713500 :         int         num_fpi = 0;
     512             : 
     513             :         /*
     514             :          * Get values needed to decide whether to do full-page writes. Since
     515             :          * we don't yet have an insertion lock, these could change under us,
     516             :          * but XLogInsertRecord will recheck them once it has a lock.
     517             :          */
     518    27713500 :         GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
     519             : 
     520    27713500 :         rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
     521             :                                  &fpw_lsn, &num_fpi, &topxid_included);
     522             : 
     523    27713500 :         EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
     524             :                                   topxid_included);
     525    27713500 :     } while (EndPos == InvalidXLogRecPtr);
     526             : 
     527    27699740 :     XLogResetInsertion();
     528             : 
     529    27699740 :     return EndPos;
     530             : }
     531             : 
     532             : /*
     533             :  * Assemble a WAL record from the registered data and buffers into an
     534             :  * XLogRecData chain, ready for insertion with XLogInsertRecord().
     535             :  *
     536             :  * The record header fields are filled in, except for the xl_prev field. The
     537             :  * calculated CRC does not include the record header yet.
     538             :  *
     539             :  * If there are any registered buffers, and a full-page image was not taken
     540             :  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
     541             :  * signals that the assembled record is only good for insertion on the
     542             :  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
     543             :  *
     544             :  * *topxid_included is set if the topmost transaction ID is logged with the
     545             :  * current subtransaction.
     546             :  */
     547             : static XLogRecData *
     548    27713500 : XLogRecordAssemble(RmgrId rmid, uint8 info,
     549             :                    XLogRecPtr RedoRecPtr, bool doPageWrites,
     550             :                    XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
     551             : {
     552             :     XLogRecData *rdt;
     553    27713500 :     uint64      total_len = 0;
     554             :     int         block_id;
     555             :     pg_crc32c   rdata_crc;
     556    27713500 :     registered_buffer *prev_regbuf = NULL;
     557             :     XLogRecData *rdt_datas_last;
     558             :     XLogRecord *rechdr;
     559    27713500 :     char       *scratch = hdr_scratch;
     560             : 
     561             :     /*
     562             :      * Note: this function can be called multiple times for the same record.
     563             :      * All the modifications we do to the rdata chains below must handle that.
     564             :      */
     565             : 
     566             :     /* The record begins with the fixed-size header */
     567    27713500 :     rechdr = (XLogRecord *) scratch;
     568    27713500 :     scratch += SizeOfXLogRecord;
     569             : 
     570    27713500 :     hdr_rdt.next = NULL;
     571    27713500 :     rdt_datas_last = &hdr_rdt;
     572    27713500 :     hdr_rdt.data = hdr_scratch;
     573             : 
     574             :     /*
     575             :      * Enforce consistency checks for this record if user is looking for it.
     576             :      * Do this before at the beginning of this routine to give the possibility
     577             :      * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
     578             :      * a record.
     579             :      */
     580    27713500 :     if (wal_consistency_checking[rmid])
     581     4179342 :         info |= XLR_CHECK_CONSISTENCY;
     582             : 
     583             :     /*
     584             :      * Make an rdata chain containing all the data portions of all block
     585             :      * references. This includes the data for full-page images. Also append
     586             :      * the headers for the block references in the scratch buffer.
     587             :      */
     588    27713500 :     *fpw_lsn = InvalidXLogRecPtr;
     589    55217484 :     for (block_id = 0; block_id < max_registered_block_id; block_id++)
     590             :     {
     591    27503984 :         registered_buffer *regbuf = &registered_buffers[block_id];
     592             :         bool        needs_backup;
     593             :         bool        needs_data;
     594             :         XLogRecordBlockHeader bkpb;
     595             :         XLogRecordBlockImageHeader bimg;
     596    27503984 :         XLogRecordBlockCompressHeader cbimg = {0};
     597             :         bool        samerel;
     598    27503984 :         bool        is_compressed = false;
     599             :         bool        include_image;
     600             : 
     601    27503984 :         if (!regbuf->in_use)
     602       21348 :             continue;
     603             : 
     604             :         /* Determine if this block needs to be backed up */
     605    27482636 :         if (regbuf->flags & REGBUF_FORCE_IMAGE)
     606      537300 :             needs_backup = true;
     607    26945336 :         else if (regbuf->flags & REGBUF_NO_IMAGE)
     608      406070 :             needs_backup = false;
     609    26539266 :         else if (!doPageWrites)
     610      455032 :             needs_backup = false;
     611             :         else
     612             :         {
     613             :             /*
     614             :              * We assume page LSN is first data on *every* page that can be
     615             :              * passed to XLogInsert, whether it has the standard page layout
     616             :              * or not.
     617             :              */
     618    26084234 :             XLogRecPtr  page_lsn = PageGetLSN(regbuf->page);
     619             : 
     620    26084234 :             needs_backup = (page_lsn <= RedoRecPtr);
     621    26084234 :             if (!needs_backup)
     622             :             {
     623    25921686 :                 if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
     624    25099634 :                     *fpw_lsn = page_lsn;
     625             :             }
     626             :         }
     627             : 
     628             :         /* Determine if the buffer data needs to included */
     629    27482636 :         if (regbuf->rdata_len == 0)
     630     5284340 :             needs_data = false;
     631    22198296 :         else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
     632      584602 :             needs_data = true;
     633             :         else
     634    21613694 :             needs_data = !needs_backup;
     635             : 
     636    27482636 :         bkpb.id = block_id;
     637    27482636 :         bkpb.fork_flags = regbuf->forkno;
     638    27482636 :         bkpb.data_length = 0;
     639             : 
     640    27482636 :         if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
     641      399996 :             bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
     642             : 
     643             :         /*
     644             :          * If needs_backup is true or WAL checking is enabled for current
     645             :          * resource manager, log a full-page write for the current block.
     646             :          */
     647    27482636 :         include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
     648             : 
     649    27482636 :         if (include_image)
     650             :         {
     651     5167602 :             const char *page = regbuf->page;
     652     5167602 :             uint16      compressed_len = 0;
     653             : 
     654             :             /*
     655             :              * The page needs to be backed up, so calculate its hole length
     656             :              * and offset.
     657             :              */
     658     5167602 :             if (regbuf->flags & REGBUF_STANDARD)
     659             :             {
     660             :                 /* Assume we can omit data between pd_lower and pd_upper */
     661     4925390 :                 uint16      lower = ((PageHeader) page)->pd_lower;
     662     4925390 :                 uint16      upper = ((PageHeader) page)->pd_upper;
     663             : 
     664     4925390 :                 if (lower >= SizeOfPageHeaderData &&
     665     4920762 :                     upper > lower &&
     666             :                     upper <= BLCKSZ)
     667             :                 {
     668     4920762 :                     bimg.hole_offset = lower;
     669     4920762 :                     cbimg.hole_length = upper - lower;
     670             :                 }
     671             :                 else
     672             :                 {
     673             :                     /* No "hole" to remove */
     674        4628 :                     bimg.hole_offset = 0;
     675        4628 :                     cbimg.hole_length = 0;
     676             :                 }
     677             :             }
     678             :             else
     679             :             {
     680             :                 /* Not a standard page header, don't try to eliminate "hole" */
     681      242212 :                 bimg.hole_offset = 0;
     682      242212 :                 cbimg.hole_length = 0;
     683             :             }
     684             : 
     685             :             /*
     686             :              * Try to compress a block image if wal_compression is enabled
     687             :              */
     688     5167602 :             if (wal_compression != WAL_COMPRESSION_NONE)
     689             :             {
     690             :                 is_compressed =
     691           0 :                     XLogCompressBackupBlock(page, bimg.hole_offset,
     692           0 :                                             cbimg.hole_length,
     693           0 :                                             regbuf->compressed_page,
     694             :                                             &compressed_len);
     695             :             }
     696             : 
     697             :             /*
     698             :              * Fill in the remaining fields in the XLogRecordBlockHeader
     699             :              * struct
     700             :              */
     701     5167602 :             bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
     702             : 
     703             :             /* Report a full page image constructed for the WAL record */
     704     5167602 :             *num_fpi += 1;
     705             : 
     706             :             /*
     707             :              * Construct XLogRecData entries for the page content.
     708             :              */
     709     5167602 :             rdt_datas_last->next = &regbuf->bkp_rdatas[0];
     710     5167602 :             rdt_datas_last = rdt_datas_last->next;
     711             : 
     712     5167602 :             bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
     713             : 
     714             :             /*
     715             :              * If WAL consistency checking is enabled for the resource manager
     716             :              * of this WAL record, a full-page image is included in the record
     717             :              * for the block modified. During redo, the full-page is replayed
     718             :              * only if BKPIMAGE_APPLY is set.
     719             :              */
     720     5167602 :             if (needs_backup)
     721      699848 :                 bimg.bimg_info |= BKPIMAGE_APPLY;
     722             : 
     723     5167602 :             if (is_compressed)
     724             :             {
     725             :                 /* The current compression is stored in the WAL record */
     726           0 :                 bimg.length = compressed_len;
     727             : 
     728             :                 /* Set the compression method used for this block */
     729           0 :                 switch ((WalCompression) wal_compression)
     730             :                 {
     731           0 :                     case WAL_COMPRESSION_PGLZ:
     732           0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
     733           0 :                         break;
     734             : 
     735           0 :                     case WAL_COMPRESSION_LZ4:
     736             : #ifdef USE_LZ4
     737           0 :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
     738             : #else
     739             :                         elog(ERROR, "LZ4 is not supported by this build");
     740             : #endif
     741           0 :                         break;
     742             : 
     743           0 :                     case WAL_COMPRESSION_ZSTD:
     744             : #ifdef USE_ZSTD
     745             :                         bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
     746             : #else
     747           0 :                         elog(ERROR, "zstd is not supported by this build");
     748             : #endif
     749             :                         break;
     750             : 
     751           0 :                     case WAL_COMPRESSION_NONE:
     752             :                         Assert(false);  /* cannot happen */
     753           0 :                         break;
     754             :                         /* no default case, so that compiler will warn */
     755             :                 }
     756             : 
     757           0 :                 rdt_datas_last->data = regbuf->compressed_page;
     758           0 :                 rdt_datas_last->len = compressed_len;
     759             :             }
     760             :             else
     761             :             {
     762     5167602 :                 bimg.length = BLCKSZ - cbimg.hole_length;
     763             : 
     764     5167602 :                 if (cbimg.hole_length == 0)
     765             :                 {
     766      246840 :                     rdt_datas_last->data = page;
     767      246840 :                     rdt_datas_last->len = BLCKSZ;
     768             :                 }
     769             :                 else
     770             :                 {
     771             :                     /* must skip the hole */
     772     4920762 :                     rdt_datas_last->data = page;
     773     4920762 :                     rdt_datas_last->len = bimg.hole_offset;
     774             : 
     775     4920762 :                     rdt_datas_last->next = &regbuf->bkp_rdatas[1];
     776     4920762 :                     rdt_datas_last = rdt_datas_last->next;
     777             : 
     778     4920762 :                     rdt_datas_last->data =
     779     4920762 :                         page + (bimg.hole_offset + cbimg.hole_length);
     780     4920762 :                     rdt_datas_last->len =
     781     4920762 :                         BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
     782             :                 }
     783             :             }
     784             : 
     785     5167602 :             total_len += bimg.length;
     786             :         }
     787             : 
     788    27482636 :         if (needs_data)
     789             :         {
     790             :             /*
     791             :              * When copying to XLogRecordBlockHeader, the length is narrowed
     792             :              * to an uint16.  Double-check that it is still correct.
     793             :              */
     794             :             Assert(regbuf->rdata_len <= UINT16_MAX);
     795             : 
     796             :             /*
     797             :              * Link the caller-supplied rdata chain for this buffer to the
     798             :              * overall list.
     799             :              */
     800    22130916 :             bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
     801    22130916 :             bkpb.data_length = (uint16) regbuf->rdata_len;
     802    22130916 :             total_len += regbuf->rdata_len;
     803             : 
     804    22130916 :             rdt_datas_last->next = regbuf->rdata_head;
     805    22130916 :             rdt_datas_last = regbuf->rdata_tail;
     806             :         }
     807             : 
     808    27482636 :         if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
     809             :         {
     810     1346636 :             samerel = true;
     811     1346636 :             bkpb.fork_flags |= BKPBLOCK_SAME_REL;
     812             :         }
     813             :         else
     814    26136000 :             samerel = false;
     815    27482636 :         prev_regbuf = regbuf;
     816             : 
     817             :         /* Ok, copy the header to the scratch buffer */
     818    27482636 :         memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
     819    27482636 :         scratch += SizeOfXLogRecordBlockHeader;
     820    27482636 :         if (include_image)
     821             :         {
     822     5167602 :             memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
     823     5167602 :             scratch += SizeOfXLogRecordBlockImageHeader;
     824     5167602 :             if (cbimg.hole_length != 0 && is_compressed)
     825             :             {
     826           0 :                 memcpy(scratch, &cbimg,
     827             :                        SizeOfXLogRecordBlockCompressHeader);
     828           0 :                 scratch += SizeOfXLogRecordBlockCompressHeader;
     829             :             }
     830             :         }
     831    27482636 :         if (!samerel)
     832             :         {
     833    26136000 :             memcpy(scratch, &regbuf->rlocator, sizeof(RelFileLocator));
     834    26136000 :             scratch += sizeof(RelFileLocator);
     835             :         }
     836    27482636 :         memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
     837    27482636 :         scratch += sizeof(BlockNumber);
     838             :     }
     839             : 
     840             :     /* followed by the record's origin, if any */
     841    27713500 :     if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
     842    16725422 :         replorigin_session_origin != InvalidRepOriginId)
     843             :     {
     844      300920 :         *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
     845      300920 :         memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
     846      300920 :         scratch += sizeof(replorigin_session_origin);
     847             :     }
     848             : 
     849             :     /* followed by toplevel XID, if not already included in previous record */
     850    27713500 :     if (IsSubxactTopXidLogPending())
     851             :     {
     852         442 :         TransactionId xid = GetTopTransactionIdIfAny();
     853             : 
     854             :         /* Set the flag that the top xid is included in the WAL */
     855         442 :         *topxid_included = true;
     856             : 
     857         442 :         *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
     858         442 :         memcpy(scratch, &xid, sizeof(TransactionId));
     859         442 :         scratch += sizeof(TransactionId);
     860             :     }
     861             : 
     862             :     /* followed by main data, if any */
     863    27713500 :     if (mainrdata_len > 0)
     864             :     {
     865    27107900 :         if (mainrdata_len > 255)
     866             :         {
     867             :             uint32      mainrdata_len_4b;
     868             : 
     869       57656 :             if (mainrdata_len > PG_UINT32_MAX)
     870           0 :                 ereport(ERROR,
     871             :                         (errmsg_internal("too much WAL data"),
     872             :                          errdetail_internal("Main data length is %llu bytes for a maximum of %u bytes.",
     873             :                                             (unsigned long long) mainrdata_len,
     874             :                                             PG_UINT32_MAX)));
     875             : 
     876       57656 :             mainrdata_len_4b = (uint32) mainrdata_len;
     877       57656 :             *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
     878       57656 :             memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
     879       57656 :             scratch += sizeof(uint32);
     880             :         }
     881             :         else
     882             :         {
     883    27050244 :             *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
     884    27050244 :             *(scratch++) = (uint8) mainrdata_len;
     885             :         }
     886    27107900 :         rdt_datas_last->next = mainrdata_head;
     887    27107900 :         rdt_datas_last = mainrdata_last;
     888    27107900 :         total_len += mainrdata_len;
     889             :     }
     890    27713500 :     rdt_datas_last->next = NULL;
     891             : 
     892    27713500 :     hdr_rdt.len = (scratch - hdr_scratch);
     893    27713500 :     total_len += hdr_rdt.len;
     894             : 
     895             :     /*
     896             :      * Calculate CRC of the data
     897             :      *
     898             :      * Note that the record header isn't added into the CRC initially since we
     899             :      * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
     900             :      * the whole record in the order: rdata, then backup blocks, then record
     901             :      * header.
     902             :      */
     903    27713500 :     INIT_CRC32C(rdata_crc);
     904    27713500 :     COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
     905   103166234 :     for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
     906    75452734 :         COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
     907             : 
     908             :     /*
     909             :      * Ensure that the XLogRecord is not too large.
     910             :      *
     911             :      * XLogReader machinery is only able to handle records up to a certain
     912             :      * size (ignoring machine resource limitations), so make sure that we will
     913             :      * not emit records larger than the sizes advertised to be supported.
     914             :      */
     915    27713500 :     if (total_len > XLogRecordMaxSize)
     916           0 :         ereport(ERROR,
     917             :                 (errmsg_internal("oversized WAL record"),
     918             :                  errdetail_internal("WAL record would be %llu bytes (of maximum %u bytes); rmid %u flags %u.",
     919             :                                     (unsigned long long) total_len, XLogRecordMaxSize, rmid, info)));
     920             : 
     921             :     /*
     922             :      * Fill in the fields in the record header. Prev-link is filled in later,
     923             :      * once we know where in the WAL the record will be inserted. The CRC does
     924             :      * not include the record header yet.
     925             :      */
     926    27713500 :     rechdr->xl_xid = GetCurrentTransactionIdIfAny();
     927    27713500 :     rechdr->xl_tot_len = (uint32) total_len;
     928    27713500 :     rechdr->xl_info = info;
     929    27713500 :     rechdr->xl_rmid = rmid;
     930    27713500 :     rechdr->xl_prev = InvalidXLogRecPtr;
     931    27713500 :     rechdr->xl_crc = rdata_crc;
     932             : 
     933    27713500 :     return &hdr_rdt;
     934             : }
     935             : 
     936             : /*
     937             :  * Create a compressed version of a backup block image.
     938             :  *
     939             :  * Returns false if compression fails (i.e., compressed result is actually
     940             :  * bigger than original). Otherwise, returns true and sets 'dlen' to
     941             :  * the length of compressed block image.
     942             :  */
     943             : static bool
     944           0 : XLogCompressBackupBlock(const char *page, uint16 hole_offset, uint16 hole_length,
     945             :                         char *dest, uint16 *dlen)
     946             : {
     947           0 :     int32       orig_len = BLCKSZ - hole_length;
     948           0 :     int32       len = -1;
     949           0 :     int32       extra_bytes = 0;
     950             :     const char *source;
     951             :     PGAlignedBlock tmp;
     952             : 
     953           0 :     if (hole_length != 0)
     954             :     {
     955             :         /* must skip the hole */
     956           0 :         memcpy(tmp.data, page, hole_offset);
     957           0 :         memcpy(tmp.data + hole_offset,
     958           0 :                page + (hole_offset + hole_length),
     959           0 :                BLCKSZ - (hole_length + hole_offset));
     960           0 :         source = tmp.data;
     961             : 
     962             :         /*
     963             :          * Extra data needs to be stored in WAL record for the compressed
     964             :          * version of block image if the hole exists.
     965             :          */
     966           0 :         extra_bytes = SizeOfXLogRecordBlockCompressHeader;
     967             :     }
     968             :     else
     969           0 :         source = page;
     970             : 
     971           0 :     switch ((WalCompression) wal_compression)
     972             :     {
     973           0 :         case WAL_COMPRESSION_PGLZ:
     974           0 :             len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
     975           0 :             break;
     976             : 
     977           0 :         case WAL_COMPRESSION_LZ4:
     978             : #ifdef USE_LZ4
     979           0 :             len = LZ4_compress_default(source, dest, orig_len,
     980             :                                        COMPRESS_BUFSIZE);
     981           0 :             if (len <= 0)
     982           0 :                 len = -1;       /* failure */
     983             : #else
     984             :             elog(ERROR, "LZ4 is not supported by this build");
     985             : #endif
     986           0 :             break;
     987             : 
     988           0 :         case WAL_COMPRESSION_ZSTD:
     989             : #ifdef USE_ZSTD
     990             :             len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
     991             :                                 ZSTD_CLEVEL_DEFAULT);
     992             :             if (ZSTD_isError(len))
     993             :                 len = -1;       /* failure */
     994             : #else
     995           0 :             elog(ERROR, "zstd is not supported by this build");
     996             : #endif
     997             :             break;
     998             : 
     999           0 :         case WAL_COMPRESSION_NONE:
    1000             :             Assert(false);      /* cannot happen */
    1001           0 :             break;
    1002             :             /* no default case, so that compiler will warn */
    1003             :     }
    1004             : 
    1005             :     /*
    1006             :      * We recheck the actual size even if compression reports success and see
    1007             :      * if the number of bytes saved by compression is larger than the length
    1008             :      * of extra data needed for the compressed version of block image.
    1009             :      */
    1010           0 :     if (len >= 0 &&
    1011           0 :         len + extra_bytes < orig_len)
    1012             :     {
    1013           0 :         *dlen = (uint16) len;   /* successful compression */
    1014           0 :         return true;
    1015             :     }
    1016           0 :     return false;
    1017             : }
    1018             : 
    1019             : /*
    1020             :  * Determine whether the buffer referenced has to be backed up.
    1021             :  *
    1022             :  * Since we don't yet have the insert lock, fullPageWrites and runningBackups
    1023             :  * (which forces full-page writes) could change later, so the result should
    1024             :  * be used for optimization purposes only.
    1025             :  */
    1026             : bool
    1027      266972 : XLogCheckBufferNeedsBackup(Buffer buffer)
    1028             : {
    1029             :     XLogRecPtr  RedoRecPtr;
    1030             :     bool        doPageWrites;
    1031             :     Page        page;
    1032             : 
    1033      266972 :     GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
    1034             : 
    1035      266972 :     page = BufferGetPage(buffer);
    1036             : 
    1037      266972 :     if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
    1038        1556 :         return true;            /* buffer requires backup */
    1039             : 
    1040      265416 :     return false;               /* buffer does not need to be backed up */
    1041             : }
    1042             : 
    1043             : /*
    1044             :  * Write a backup block if needed when we are setting a hint. Note that
    1045             :  * this may be called for a variety of page types, not just heaps.
    1046             :  *
    1047             :  * Callable while holding just share lock on the buffer content.
    1048             :  *
    1049             :  * We can't use the plain backup block mechanism since that relies on the
    1050             :  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
    1051             :  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
    1052             :  * failures. So instead we copy the page and insert the copied data as normal
    1053             :  * record data.
    1054             :  *
    1055             :  * We only need to do something if page has not yet been full page written in
    1056             :  * this checkpoint round. The LSN of the inserted wal record is returned if we
    1057             :  * had to write, InvalidXLogRecPtr otherwise.
    1058             :  *
    1059             :  * It is possible that multiple concurrent backends could attempt to write WAL
    1060             :  * records. In that case, multiple copies of the same block would be recorded
    1061             :  * in separate WAL records by different backends, though that is still OK from
    1062             :  * a correctness perspective.
    1063             :  */
    1064             : XLogRecPtr
    1065      103434 : XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
    1066             : {
    1067      103434 :     XLogRecPtr  recptr = InvalidXLogRecPtr;
    1068             :     XLogRecPtr  lsn;
    1069             :     XLogRecPtr  RedoRecPtr;
    1070             : 
    1071             :     /*
    1072             :      * Ensure no checkpoint can change our view of RedoRecPtr.
    1073             :      */
    1074             :     Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0);
    1075             : 
    1076             :     /*
    1077             :      * Update RedoRecPtr so that we can make the right decision
    1078             :      */
    1079      103434 :     RedoRecPtr = GetRedoRecPtr();
    1080             : 
    1081             :     /*
    1082             :      * We assume page LSN is first data on *every* page that can be passed to
    1083             :      * XLogInsert, whether it has the standard page layout or not. Since we're
    1084             :      * only holding a share-lock on the page, we must take the buffer header
    1085             :      * lock when we look at the LSN.
    1086             :      */
    1087      103434 :     lsn = BufferGetLSNAtomic(buffer);
    1088             : 
    1089      103434 :     if (lsn <= RedoRecPtr)
    1090             :     {
    1091       54616 :         int         flags = 0;
    1092             :         PGAlignedBlock copied_buffer;
    1093       54616 :         char       *origdata = (char *) BufferGetBlock(buffer);
    1094             :         RelFileLocator rlocator;
    1095             :         ForkNumber  forkno;
    1096             :         BlockNumber blkno;
    1097             : 
    1098             :         /*
    1099             :          * Copy buffer so we don't have to worry about concurrent hint bit or
    1100             :          * lsn updates. We assume pd_lower/upper cannot be changed without an
    1101             :          * exclusive lock, so the contents bkp are not racy.
    1102             :          */
    1103       54616 :         if (buffer_std)
    1104             :         {
    1105             :             /* Assume we can omit data between pd_lower and pd_upper */
    1106       33548 :             Page        page = BufferGetPage(buffer);
    1107       33548 :             uint16      lower = ((PageHeader) page)->pd_lower;
    1108       33548 :             uint16      upper = ((PageHeader) page)->pd_upper;
    1109             : 
    1110       33548 :             memcpy(copied_buffer.data, origdata, lower);
    1111       33548 :             memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
    1112             :         }
    1113             :         else
    1114       21068 :             memcpy(copied_buffer.data, origdata, BLCKSZ);
    1115             : 
    1116       54616 :         XLogBeginInsert();
    1117             : 
    1118       54616 :         if (buffer_std)
    1119       33548 :             flags |= REGBUF_STANDARD;
    1120             : 
    1121       54616 :         BufferGetTag(buffer, &rlocator, &forkno, &blkno);
    1122       54616 :         XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags);
    1123             : 
    1124       54616 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
    1125             :     }
    1126             : 
    1127      103434 :     return recptr;
    1128             : }
    1129             : 
    1130             : /*
    1131             :  * Write a WAL record containing a full image of a page. Caller is responsible
    1132             :  * for writing the page to disk after calling this routine.
    1133             :  *
    1134             :  * Note: If you're using this function, you should be building pages in private
    1135             :  * memory and writing them directly to smgr.  If you're using buffers, call
    1136             :  * log_newpage_buffer instead.
    1137             :  *
    1138             :  * If the page follows the standard page layout, with a PageHeader and unused
    1139             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1140             :  * the unused space to be left out from the WAL record, making it smaller.
    1141             :  */
    1142             : XLogRecPtr
    1143      242246 : log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno,
    1144             :             Page page, bool page_std)
    1145             : {
    1146             :     int         flags;
    1147             :     XLogRecPtr  recptr;
    1148             : 
    1149      242246 :     flags = REGBUF_FORCE_IMAGE;
    1150      242246 :     if (page_std)
    1151      241820 :         flags |= REGBUF_STANDARD;
    1152             : 
    1153      242246 :     XLogBeginInsert();
    1154      242246 :     XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags);
    1155      242246 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1156             : 
    1157             :     /*
    1158             :      * The page may be uninitialized. If so, we can't set the LSN because that
    1159             :      * would corrupt the page.
    1160             :      */
    1161      242246 :     if (!PageIsNew(page))
    1162             :     {
    1163      242238 :         PageSetLSN(page, recptr);
    1164             :     }
    1165             : 
    1166      242246 :     return recptr;
    1167             : }
    1168             : 
    1169             : /*
    1170             :  * Like log_newpage(), but allows logging multiple pages in one operation.
    1171             :  * It is more efficient than calling log_newpage() for each page separately,
    1172             :  * because we can write multiple pages in a single WAL record.
    1173             :  */
    1174             : void
    1175       35684 : log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages,
    1176             :              BlockNumber *blknos, Page *pages, bool page_std)
    1177             : {
    1178             :     int         flags;
    1179             :     XLogRecPtr  recptr;
    1180             :     int         i;
    1181             :     int         j;
    1182             : 
    1183       35684 :     flags = REGBUF_FORCE_IMAGE;
    1184       35684 :     if (page_std)
    1185       35596 :         flags |= REGBUF_STANDARD;
    1186             : 
    1187             :     /*
    1188             :      * Iterate over all the pages. They are collected into batches of
    1189             :      * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
    1190             :      * batch.
    1191             :      */
    1192       35684 :     XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
    1193             : 
    1194       35684 :     i = 0;
    1195       71368 :     while (i < num_pages)
    1196             :     {
    1197       35684 :         int         batch_start = i;
    1198             :         int         nbatch;
    1199             : 
    1200       35684 :         XLogBeginInsert();
    1201             : 
    1202       35684 :         nbatch = 0;
    1203      106634 :         while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
    1204             :         {
    1205       70950 :             XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags);
    1206       70950 :             i++;
    1207       70950 :             nbatch++;
    1208             :         }
    1209             : 
    1210       35684 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1211             : 
    1212      106634 :         for (j = batch_start; j < i; j++)
    1213             :         {
    1214             :             /*
    1215             :              * The page may be uninitialized. If so, we can't set the LSN
    1216             :              * because that would corrupt the page.
    1217             :              */
    1218       70950 :             if (!PageIsNew(pages[j]))
    1219             :             {
    1220       70942 :                 PageSetLSN(pages[j], recptr);
    1221             :             }
    1222             :         }
    1223             :     }
    1224       35684 : }
    1225             : 
    1226             : /*
    1227             :  * Write a WAL record containing a full image of a page.
    1228             :  *
    1229             :  * Caller should initialize the buffer and mark it dirty before calling this
    1230             :  * function.  This function will set the page LSN.
    1231             :  *
    1232             :  * If the page follows the standard page layout, with a PageHeader and unused
    1233             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1234             :  * the unused space to be left out from the WAL record, making it smaller.
    1235             :  */
    1236             : XLogRecPtr
    1237      236860 : log_newpage_buffer(Buffer buffer, bool page_std)
    1238             : {
    1239      236860 :     Page        page = BufferGetPage(buffer);
    1240             :     RelFileLocator rlocator;
    1241             :     ForkNumber  forknum;
    1242             :     BlockNumber blkno;
    1243             : 
    1244             :     /* Shared buffers should be modified in a critical section. */
    1245             :     Assert(CritSectionCount > 0);
    1246             : 
    1247      236860 :     BufferGetTag(buffer, &rlocator, &forknum, &blkno);
    1248             : 
    1249      236860 :     return log_newpage(&rlocator, forknum, blkno, page, page_std);
    1250             : }
    1251             : 
    1252             : /*
    1253             :  * WAL-log a range of blocks in a relation.
    1254             :  *
    1255             :  * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
    1256             :  * written to the WAL. If the range is large, this is done in multiple WAL
    1257             :  * records.
    1258             :  *
    1259             :  * If all page follows the standard page layout, with a PageHeader and unused
    1260             :  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
    1261             :  * the unused space to be left out from the WAL records, making them smaller.
    1262             :  *
    1263             :  * NOTE: This function acquires exclusive-locks on the pages. Typically, this
    1264             :  * is used on a newly-built relation, and the caller is holding a
    1265             :  * AccessExclusiveLock on it, so no other backend can be accessing it at the
    1266             :  * same time. If that's not the case, you must ensure that this does not
    1267             :  * cause a deadlock through some other means.
    1268             :  */
    1269             : void
    1270       79320 : log_newpage_range(Relation rel, ForkNumber forknum,
    1271             :                   BlockNumber startblk, BlockNumber endblk,
    1272             :                   bool page_std)
    1273             : {
    1274             :     int         flags;
    1275             :     BlockNumber blkno;
    1276             : 
    1277       79320 :     flags = REGBUF_FORCE_IMAGE;
    1278       79320 :     if (page_std)
    1279        1172 :         flags |= REGBUF_STANDARD;
    1280             : 
    1281             :     /*
    1282             :      * Iterate over all the pages in the range. They are collected into
    1283             :      * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
    1284             :      * for each batch.
    1285             :      */
    1286       79320 :     XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
    1287             : 
    1288       79320 :     blkno = startblk;
    1289      139150 :     while (blkno < endblk)
    1290             :     {
    1291             :         Buffer      bufpack[XLR_MAX_BLOCK_ID];
    1292             :         XLogRecPtr  recptr;
    1293             :         int         nbufs;
    1294             :         int         i;
    1295             : 
    1296       59830 :         CHECK_FOR_INTERRUPTS();
    1297             : 
    1298             :         /* Collect a batch of blocks. */
    1299       59830 :         nbufs = 0;
    1300      279210 :         while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
    1301             :         {
    1302      219380 :             Buffer      buf = ReadBufferExtended(rel, forknum, blkno,
    1303             :                                                  RBM_NORMAL, NULL);
    1304             : 
    1305      219380 :             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
    1306             : 
    1307             :             /*
    1308             :              * Completely empty pages are not WAL-logged. Writing a WAL record
    1309             :              * would change the LSN, and we don't want that. We want the page
    1310             :              * to stay empty.
    1311             :              */
    1312      219380 :             if (!PageIsNew(BufferGetPage(buf)))
    1313      218444 :                 bufpack[nbufs++] = buf;
    1314             :             else
    1315         936 :                 UnlockReleaseBuffer(buf);
    1316      219380 :             blkno++;
    1317             :         }
    1318             : 
    1319             :         /* Nothing more to do if all remaining blocks were empty. */
    1320       59830 :         if (nbufs == 0)
    1321           0 :             break;
    1322             : 
    1323             :         /* Write WAL record for this batch. */
    1324       59830 :         XLogBeginInsert();
    1325             : 
    1326       59830 :         START_CRIT_SECTION();
    1327      278274 :         for (i = 0; i < nbufs; i++)
    1328             :         {
    1329      218444 :             MarkBufferDirty(bufpack[i]);
    1330      218444 :             XLogRegisterBuffer(i, bufpack[i], flags);
    1331             :         }
    1332             : 
    1333       59830 :         recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
    1334             : 
    1335      278274 :         for (i = 0; i < nbufs; i++)
    1336             :         {
    1337      218444 :             PageSetLSN(BufferGetPage(bufpack[i]), recptr);
    1338      218444 :             UnlockReleaseBuffer(bufpack[i]);
    1339             :         }
    1340       59830 :         END_CRIT_SECTION();
    1341             :     }
    1342       79320 : }
    1343             : 
    1344             : /*
    1345             :  * Allocate working buffers needed for WAL record construction.
    1346             :  */
    1347             : void
    1348       33234 : InitXLogInsert(void)
    1349             : {
    1350             : #ifdef USE_ASSERT_CHECKING
    1351             : 
    1352             :     /*
    1353             :      * Check that any records assembled can be decoded.  This is capped based
    1354             :      * on what XLogReader would require at its maximum bound.  The XLOG_BLCKSZ
    1355             :      * addend covers the larger allocate_recordbuf() demand.  This code path
    1356             :      * is called once per backend, more than enough for this check.
    1357             :      */
    1358             :     size_t      max_required =
    1359             :         DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ);
    1360             : 
    1361             :     Assert(AllocSizeIsValid(max_required));
    1362             : #endif
    1363             : 
    1364             :     /* Initialize the working areas */
    1365       33234 :     if (xloginsert_cxt == NULL)
    1366             :     {
    1367       33234 :         xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
    1368             :                                                "WAL record construction",
    1369             :                                                ALLOCSET_DEFAULT_SIZES);
    1370             :     }
    1371             : 
    1372       33234 :     if (registered_buffers == NULL)
    1373             :     {
    1374       33234 :         registered_buffers = (registered_buffer *)
    1375       33234 :             MemoryContextAllocZero(xloginsert_cxt,
    1376             :                                    sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
    1377       33234 :         max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
    1378             :     }
    1379       33234 :     if (rdatas == NULL)
    1380             :     {
    1381       33234 :         rdatas = MemoryContextAlloc(xloginsert_cxt,
    1382             :                                     sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
    1383       33234 :         max_rdatas = XLR_NORMAL_RDATAS;
    1384             :     }
    1385             : 
    1386             :     /*
    1387             :      * Allocate a buffer to hold the header information for a WAL record.
    1388             :      */
    1389       33234 :     if (hdr_scratch == NULL)
    1390       33234 :         hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
    1391             :                                              HEADER_SCRATCH_SIZE);
    1392       33234 : }

Generated by: LCOV version 1.14